Fixed NC memory leak. Fixed duplicate job cleanup. Cleaned up data treatment in Hyracks
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@865 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks-examples/tpch-example/tpchapp/pom.xml
index c65aaf5..05256a3 100644
--- a/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ b/hyracks-examples/tpch-example/tpchapp/pom.xml
@@ -2,8 +2,6 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
<artifactId>tpchapp</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
<parent>
<groupId>edu.uci.ics.hyracks.examples</groupId>
<artifactId>tpch-example</artifactId>
@@ -83,5 +81,10 @@
<version>0.2.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks-examples/tpch-example/tpchclient/pom.xml
index c7a138a..d9b5fce 100644
--- a/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -2,8 +2,6 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
<artifactId>tpchclient</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
<parent>
<groupId>edu.uci.ics.hyracks.examples</groupId>
<artifactId>tpch-example</artifactId>
@@ -17,6 +15,11 @@
<version>0.2.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index a43aafb..cffa0f4 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -38,8 +38,9 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -117,24 +118,16 @@
CmdLineParser parser = new CmdLineParser(options);
parser.parseArgument(args);
- IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
- options.port);
+ IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
- JobSpecification job = createJob(
- parseFileSplits(options.inFileCustomerSplits),
- parseFileSplits(options.inFileOrderSplits),
- parseFileSplits(options.outFileSplits),
- options.numJoinPartitions, options.algo,
- options.graceInputSize, options.graceRecordsPerFrame,
- options.graceFactor, options.memSize, options.tableSize,
- options.hasGroupBy);
+ JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
+ parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
+ options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+ options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
long start = System.currentTimeMillis();
- JobId jobId = hcc.createJob(
- options.app,
- job,
- options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
- .noneOf(JobFlag.class));
+ JobId jobId = hcc.createJob(options.app, job,
+ options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
@@ -148,104 +141,65 @@
String s = splits[i].trim();
int idx = s.indexOf(':');
if (idx < 0) {
- throw new IllegalArgumentException("File split " + s
- + " not well formed");
+ throw new IllegalArgumentException("File split " + s + " not well formed");
}
- fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
- new File(s.substring(idx + 1))));
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
}
return fSplits;
}
- private static JobSpecification createJob(FileSplit[] customerSplits,
- FileSplit[] orderSplits, FileSplit[] resultSplits,
- int numJoinPartitions, String algo, int graceInputSize,
- int graceRecordsPerFrame, double graceFactor, int memSize,
- int tableSize, boolean hasGroupBy) throws HyracksDataException {
+ private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
+ FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
+ double graceFactor, int memSize, int tableSize, boolean hasGroupBy) throws HyracksDataException {
JobSpecification spec = new JobSpecification();
- IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(
- customerSplits);
- RecordDescriptor custDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
- orderSplits);
- RecordDescriptor ordersDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- RecordDescriptor custOrderJoinDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
- spec, ordersSplitsProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
createPartitionConstraint(spec, ordScanner, orderSplits);
- FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(
- spec, custSplitsProvider, new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] {
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'),
- custDesc);
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
createPartitionConstraint(spec, custScanner, customerSplits);
IOperatorDescriptor join;
if ("nestedloop".equalsIgnoreCase(algo)) {
- join = new NestedLoopJoinOperatorDescriptor(spec,
- new JoinComparatorFactory(
- UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- custOrderJoinDesc, memSize);
+ join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize);
} else if ("gracehash".equalsIgnoreCase(algo)) {
join = new GraceHashJoinOperatorDescriptor(
@@ -256,8 +210,9 @@
graceFactor,
new int[] { 0 },
new int[] { 1 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc);
} else if ("hybridhash".equalsIgnoreCase(algo)) {
@@ -269,8 +224,9 @@
graceFactor,
new int[] { 0 },
new int[] { 1 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc);
} else {
@@ -278,81 +234,71 @@
spec,
new int[] { 0 },
new int[] { 1 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, 6000000);
}
- PartitionConstraintHelper.addPartitionCountConstraint(spec, join,
- numJoinPartitions);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
- IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- new int[] { 1 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(ordJoinConn, ordScanner, 0, join, 1);
- IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- new int[] { 0 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(custJoinConn, custScanner, 0, join, 0);
IOperatorDescriptor endingOp = join;
if (hasGroupBy) {
- RecordDescriptor groupResultDesc = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
spec,
new int[] { 6 },
- new FieldHashPartitionComputerFactory(
- new int[] { 6 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new FieldHashPartitionComputerFactory(new int[] { 6 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
new MultiAggregatorFactory(
new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
groupResultDesc, 16);
createPartitionConstraint(spec, gby, resultSplits);
- IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- new int[] { 6 },
- new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 6 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
spec.connect(joinGroupConn, join, 0, gby, 0);
endingOp = gby;
}
- IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
- resultSplits);
- FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(
- spec, outSplitProvider);
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
+ FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
createPartitionConstraint(spec, writer, resultSplits);
- IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(
- spec);
+ IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
spec.addRoot(writer);
return spec;
}
- private static void createPartitionConstraint(JobSpecification spec,
- IOperatorDescriptor op, FileSplit[] splits) {
+ private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
String[] parts = new String[splits.length];
for (int i = 0; i < splits.length; ++i) {
parts[i] = splits[i].getNodeName();
}
- PartitionConstraintHelper
- .addAbsoluteLocationConstraint(spec, op, parts);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
}
static class JoinComparatorFactory implements ITuplePairComparatorFactory {
@@ -362,8 +308,7 @@
private final int pos0;
private final int pos1;
- public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
- int pos0, int pos1) {
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
this.bFactory = bFactory;
this.pos0 = pos0;
this.pos1 = pos1;
@@ -371,8 +316,7 @@
@Override
public ITuplePairComparator createTuplePairComparator() {
- return new JoinComparator(bFactory.createBinaryComparator(), pos0,
- pos1);
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
}
}
@@ -382,16 +326,14 @@
private final int field0;
private final int field1;
- public JoinComparator(IBinaryComparator bComparator, int field0,
- int field1) {
+ public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
this.bComparator = bComparator;
this.field0 = field0;
this.field1 = field1;
}
@Override
- public int compare(IFrameTupleAccessor accessor0, int tIndex0,
- IFrameTupleAccessor accessor1, int tIndex1) {
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
int tStart0 = accessor0.getTupleStartOffset(tIndex0);
int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
@@ -406,9 +348,8 @@
int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
int fLen1 = fEnd1 - fStart1;
- int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
- + fStartOffset0, fLen0, accessor1.getBuffer().array(),
- fStart1 + fStartOffset1, fLen1);
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+ .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
if (c != 0) {
return c;
}