fix hash spillable table factory: the writer is openned twice
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2721 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java b/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
deleted file mode 100644
index 50060d2..0000000
--- a/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package edu.uci.ics.genomix.core.jobgen.clusterconfig;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public class ClusterConfig {
-
- private static String[] NCs;
- private static String storePropertiesPath = "conf/stores.properties";
- private static String clusterPropertiesPath = "conf/cluster.properties";
- private static Properties clusterProperties = new Properties();
- private static Map<String, List<String>> ipToNcMapping;
- private static String[] stores;
-
- /**
- * let tests set config path to be whatever
- *
- * @param propertiesPath
- * stores properties file path
- */
- public static void setStorePath(String storePropertiesPath) throws HyracksException {
- ClusterConfig.storePropertiesPath = storePropertiesPath;
- }
-
- public static void setClusterPropertiesPath(String clusterPropertiesPath) throws HyracksException {
- ClusterConfig.clusterPropertiesPath = clusterPropertiesPath;
- }
-
- /**
- * get NC names running on one IP address
- *
- * @param ipAddress
- * @return
- * @throws HyracksDataException
- */
- public static List<String> getNCNames(String ipAddress) throws HyracksException {
- return ipToNcMapping.get(ipAddress);
- }
-
- /**
- * get file split provider
- *
- * @param jobId
- * @return
- * @throws HyracksDataException
- */
- public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {
- FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];
- int i = 0;
- for (String nc : NCs) {
- for (String st : stores) {
- FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId
- + File.separator + indexName);
- fileSplits[i++] = split;
- }
- }
- return new ConstantFileSplitProvider(fileSplits);
- }
-
- private static void loadStores() throws HyracksException {
- Properties properties = new Properties();
- try {
- properties.load(new FileInputStream(storePropertiesPath));
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- String store = properties.getProperty("store");
- stores = store.split(",");
- }
-
- private static void loadClusterProperties() throws HyracksException {
- try {
- clusterProperties.load(new FileInputStream(clusterPropertiesPath));
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public static int getFrameSize() {
- return Integer.parseInt(clusterProperties.getProperty("FRAME_SIZE"));
- }
-
- /**
- * set location constraint
- *
- * @param spec
- * @param operator
- * @throws HyracksDataException
- */
- public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
- List<InputSplit> splits) throws HyracksException {
- int count = splits.size();
- String[] locations = new String[splits.size()];
- Random random = new Random(System.currentTimeMillis());
- for (int i = 0; i < splits.size(); i++) {
- try {
- String[] loc = splits.get(i).getLocations();
- Collections.shuffle(Arrays.asList(loc), random);
- if (loc.length > 0) {
- InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
- for (InetAddress ip : allIps) {
- if (ipToNcMapping.get(ip.getHostAddress()) != null) {
- List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
- int pos = random.nextInt(ncs.size());
- locations[i] = ncs.get(pos);
- } else {
- int pos = random.nextInt(NCs.length);
- locations[i] = NCs[pos];
- }
- }
- } else {
- int pos = random.nextInt(NCs.length);
- locations[i] = NCs[pos];
- }
- } catch (IOException e) {
- throw new HyracksException(e);
- } catch (InterruptedException e) {
- throw new HyracksException(e);
- }
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
- }
-
- /**
- * set location constraint
- *
- * @param spec
- * @param operator
- * @throws HyracksDataException
- */
- public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
- throws HyracksException {
- int count = 0;
- String[] locations = new String[NCs.length * stores.length];
- for (String nc : NCs) {
- for (int i = 0; i < stores.length; i++) {
- locations[count] = nc;
- count++;
- }
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
- }
-
- /**
- * set location constraint
- *
- * @param spec
- * @param operator
- * @throws HyracksDataException
- */
- public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
- int count = NCs.length * stores.length;
- PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
- }
-
- public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
- try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
- NCs = new String[ncNameToNcInfos.size()];
- ipToNcMapping = new HashMap<String, List<String>>();
- int i = 0;
- for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
- String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
- .getHostAddress();
- List<String> matchedNCs = ipToNcMapping.get(ipAddr);
- if (matchedNCs == null) {
- matchedNCs = new ArrayList<String>();
- ipToNcMapping.put(ipAddr, matchedNCs);
- }
- matchedNCs.add(entry.getKey());
- NCs[i] = entry.getKey();
- i++;
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
-
- loadClusterProperties();
- loadStores();
- }
-}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
deleted file mode 100644
index dd4ca25..0000000
--- a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-
-public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
- private static final long serialVersionUID = 1L;
-
- private static final int C1 = 0xcc9e2d51;
- private static final int C2 = 0x1b873593;
- private static final int C3 = 5;
- private static final int C4 = 0xe6546b64;
- private static final int C5 = 0x85ebca6b;
- private static final int C6 = 0xc2b2ae35;
-
- @Override
- public IBinaryHashFunction createBinaryHashFunction(final int seed) {
- return new IBinaryHashFunction() {
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- int h = seed;
- int p = offset;
- int remain = length;
- while (remain > 4) {
- int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8) | (((int) bytes[p + 2]) << 16)
- | (((int) bytes[p + 3]) << 24);
- k *= C1;
- k = Integer.rotateLeft(k, 15);
- k *= C2;
- h ^= k;
- h = Integer.rotateLeft(h, 13);
- h = h * C3 + C4;
- p += 4;
- remain -= 4;
- }
- int k = 0;
- switch (remain) {
- case 3:
- k = bytes[p++];
- case 2:
- k = (k << 8) | bytes[p++];
- case 1:
- k = (k << 8) | bytes[p++];
- k *= C1;
- k = Integer.rotateLeft(k, 15);
- k *= C2;
- h ^= k;
- h = Integer.rotateLeft(h, 13);
- h = h * C3 + C4;
- }
- h ^= length;
- h ^= (h >>> 16);
- h *= C5;
- h ^= (h >>> 13);
- h *= C6;
- h ^= (h >>> 16);
- return h;
- }
- };
- }
-}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index e7a8e6c..493ae1e 100644
--- a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -73,7 +73,7 @@
String s = "G:" + File.separator + "data"
+ File.separator + "filename.txt";*/
- String s = "g:\\data\\filename" + String.valueOf(temp) + ".txt";
+ String s = "/Users/yingyib/data/filename" + String.valueOf(temp) + ".txt";
filenames = new FileInputStream(s);
// filenames = new FileInputStream("filename.txt");
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index e6a6ebb..067452c 100644
--- a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,12 +1,8 @@
package edu.uci.ics.genomix.dataflow;
-import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -14,7 +10,7 @@
import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -27,15 +23,11 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -44,9 +36,7 @@
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -55,7 +45,6 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
@@ -63,307 +52,367 @@
public class Tester {
- private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());
- public static final String NC1_ID = "nc1";
- public static final String NC2_ID = "nc2";
- public static final String NC3_ID = "nc3";
- public static final String NC4_ID = "nc4";
+ private static final Logger LOGGER = Logger.getLogger(Tester.class
+ .getName());
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+ public static final String NC3_ID = "nc3";
+ public static final String NC4_ID = "nc4";
- private static ClusterControllerService cc;
- private static NodeControllerService nc1;
- private static NodeControllerService nc2;
- private static NodeControllerService nc3;
- private static NodeControllerService nc4;
- private static IHyracksClientConnection hcc;
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static NodeControllerService nc3;
+ private static NodeControllerService nc4;
+ private static IHyracksClientConnection hcc;
- //private static final boolean DEBUG = true;
+ // private static final boolean DEBUG = true;
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) throws Exception {
- LOGGER.setLevel(Level.OFF);
+ LOGGER.setLevel(Level.OFF);
- init();
+ init();
- // Options options = new Options();
+ // Options options = new Options();
- IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
+ IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
- /*
- * JobSpecification job =
- * createJob(parseFileSplits(options.inFileCustomerSplits),
- * parseFileSplits(options.inFileOrderSplits),
- * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
- * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
- * options.graceFactor, options.memSize, options.tableSize,
- * options.hasGroupBy);
- */
+ /*
+ * JobSpecification job =
+ * createJob(parseFileSplits(options.inFileCustomerSplits),
+ * parseFileSplits(options.inFileOrderSplits),
+ * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
+ * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+ * options.graceFactor, options.memSize, options.tableSize,
+ * options.hasGroupBy);
+ */
- int k, page_num;
- String file_name = args[0];
- k = Integer.parseInt(args[1]);
- page_num = Integer.parseInt(args[2]);
- int type = Integer.parseInt(args[3]);
+ int k, page_num;
+ String file_name = args[0];
+ k = Integer.parseInt(args[1]);
+ page_num = Integer.parseInt(args[2]);
+ int type = Integer.parseInt(args[3]);
- JobSpecification job = createJob(file_name, k, page_num, type);
+ JobSpecification job = createJob(file_name, k, page_num, type);
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob("test", job);
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
-
- FileOutputStream filenames;
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob("test", job);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
- String s = "g:\\data\\results.txt" ;
+ FileOutputStream filenames;
- filenames = new FileOutputStream(s);
- // filenames = new FileInputStream("filename.txt");
+ String s = "/Users/yingyib/data/results.txt";
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));
- writer.write((int) (end-start));
- writer.close();
-
- }
+ filenames = new FileOutputStream(s);
+ // filenames = new FileInputStream("filename.txt");
- public static void init() throws Exception {
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clusterNetPort = 39001;
- ccConfig.profileDumpPeriod = 10000;
- File outDir = new File("target/ClusterController");
- outDir.mkdirs();
- File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);
- ccRoot.delete();
- ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
- cc = new ClusterControllerService(ccConfig);
- cc.start();
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ filenames));
+ writer.write((int) (end - start));
+ writer.close();
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- nc1 = new NodeControllerService(ncConfig1);
- nc1.start();
+ }
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.ccPort = 39001;
- ncConfig2.clusterNetIPAddress = "127.0.0.1";
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
- nc2 = new NodeControllerService(ncConfig2);
- nc2.start();
-
- NCConfig ncConfig3 = new NCConfig();
- ncConfig3.ccHost = "localhost";
- ncConfig3.ccPort = 39001;
- ncConfig3.clusterNetIPAddress = "127.0.0.1";
- ncConfig3.dataIPAddress = "127.0.0.1";
- ncConfig3.nodeId = NC3_ID;
- nc3 = new NodeControllerService(ncConfig3);
- nc3.start();
-
- NCConfig ncConfig4 = new NCConfig();
- ncConfig4.ccHost = "localhost";
- ncConfig4.ccPort = 39001;
- ncConfig4.clusterNetIPAddress = "127.0.0.1";
- ncConfig4.dataIPAddress = "127.0.0.1";
- ncConfig4.nodeId = NC4_ID;
- nc4 = new NodeControllerService(ncConfig4);
- nc4.start();
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = 39000;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = 39001;
+ ccConfig.profileDumpPeriod = -1;
+ File outDir = new File("target/ClusterController");
+ outDir.mkdirs();
+ File ccRoot = File.createTempFile(Tester.class.getName(), ".data",
+ outDir);
+ ccRoot.delete();
+ ccRoot.mkdir();
+ ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
- hcc.createApplication("test", null);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
- }
- }
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = 39001;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ ncConfig1.ioDevices = "/tmp/t1";
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
- private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {
- JobSpecification spec = new JobSpecification();
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.ccPort = 39001;
+ ncConfig2.clusterNetIPAddress = "127.0.0.1";
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ ncConfig2.ioDevices = "/tmp/t2";
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
- spec.setFrameSize(32768);
+ NCConfig ncConfig3 = new NCConfig();
+ ncConfig3.ccHost = "localhost";
+ ncConfig3.ccPort = 39001;
+ ncConfig3.clusterNetIPAddress = "127.0.0.1";
+ ncConfig3.dataIPAddress = "127.0.0.1";
+ ncConfig3.nodeId = NC3_ID;
+ ncConfig3.ioDevices = "/tmp/t3";
+ nc3 = new NodeControllerService(ncConfig3);
+ nc3.start();
- FileScanDescriptor scan = new FileScanDescriptor(spec, k);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);
+ NCConfig ncConfig4 = new NCConfig();
+ ncConfig4.ccHost = "localhost";
+ ncConfig4.ccPort = 39001;
+ ncConfig4.clusterNetIPAddress = "127.0.0.1";
+ ncConfig4.dataIPAddress = "127.0.0.1";
+ ncConfig4.nodeId = NC4_ID;
+ ncConfig4.ioDevices = "/tmp/t4";
+ nc4 = new NodeControllerService(ncConfig4);
+ nc4.start();
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
+ ccConfig.clientNetPort);
+ hcc.createApplication("test", null);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+ }
+ }
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4096;
- int tableSize = 10485767;
+ private static JobSpecification createJob(String filename, int k,
+ int page_num, int type) throws HyracksDataException {
+ JobSpecification spec = new JobSpecification();
- AbstractOperatorDescriptor single_grouper;
- IConnectorDescriptor conn_partition;
- AbstractOperatorDescriptor cross_grouper;
-
+ spec.setFrameSize(32768);
-
- if(0 == type){//external group by
- single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
+ FileScanDescriptor scan = new FileScanDescriptor(spec, k);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
+ NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
+ // NC1_ID);
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
-
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
- }
- else if( 1 == type){
- single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }), tableSize), true);
- conn_partition = new MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(),
- keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY)} );
- cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(),
- outputRec);
- }
- else{
- long inputSizeInRawRecords = 154000000;
- long inputSizeInUniqueKeys = 38500000;
- int recordSizeInBytes = 9;
- int hashfuncStartLevel = 1;
- single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
- frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new MurmurHash3BinaryHashFunctionFamily()},
- hashfuncStartLevel,
- new Integer64NormalizedKeyComputerFactory(),
- new MergeKmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(),
- outputRec, true);
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- recordSizeInBytes = 13;
- cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
- frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new MurmurHash3BinaryHashFunctionFamily()},
- hashfuncStartLevel,
- new Integer64NormalizedKeyComputerFactory(),
- new DistributedMergeLmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(),
- outputRec, true);
- }
-
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
-
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(readfileConn, scan, 0, single_grouper, 0);
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4096;
+ int tableSize = 10485767;
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
+ AbstractOperatorDescriptor single_grouper;
+ IConnectorDescriptor conn_partition;
+ AbstractOperatorDescriptor cross_grouper;
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+ if (0 == type) {// external group by
+ single_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(printConn, cross_grouper, 0, printer, 0);
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }),
+ tableSize), true);
- spec.addRoot(printer);
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ cross_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
- if( 1 == type ){
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- // System.out.println(spec.toString());
- return spec;
- }
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }),
+ tableSize), true);
+ } else if (1 == type) {
+ single_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new Integer64NormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(LongPointable.FACTORY) }),
+ tableSize), true);
+ conn_partition = new MToNPartitioningMergingConnectorDescriptor(
+ spec,
+ new KmerHashPartitioncomputerFactory(),
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) });
+ cross_grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new DistributedMergeLmerAggregateFactory(), outputRec);
+ } else {
+ long inputSizeInRawRecords = 154000000;
+ long inputSizeInUniqueKeys = 38500000;
+ int recordSizeInBytes = 9;
+ int hashfuncStartLevel = 1;
+ single_grouper = new HybridHashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ inputSizeInRawRecords,
+ inputSizeInUniqueKeys,
+ recordSizeInBytes,
+ tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },
+ hashfuncStartLevel,
+ new Integer64NormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ recordSizeInBytes = 13;
+ cross_grouper = new HybridHashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ inputSizeInRawRecords,
+ inputSizeInUniqueKeys,
+ recordSizeInBytes,
+ tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },
+ hashfuncStartLevel,
+ new Integer64NormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ }
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // single_grouper, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
- static class JoinComparatorFactory implements ITuplePairComparatorFactory {
- private static final long serialVersionUID = 1L;
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+ spec);
+ spec.connect(readfileConn, scan, 0, single_grouper, 0);
- private final IBinaryComparatorFactory bFactory;
- private final int pos0;
- private final int pos1;
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // cross_grouper,NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
- public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
- this.bFactory = bFactory;
- this.pos0 = pos0;
- this.pos1 = pos1;
- }
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // printer, NC1_ID);
- @Override
- public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
- return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
- }
- }
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(printConn, cross_grouper, 0, printer, 0);
- static class JoinComparator implements ITuplePairComparator {
+ spec.addRoot(printer);
- private final IBinaryComparator bComparator;
- private final int field0;
- private final int field1;
+ if (1 == type) {
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ // System.out.println(spec.toString());
+ return spec;
+ }
- public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
- this.bComparator = bComparator;
- this.field0 = field0;
- this.field1 = field1;
- }
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
- @Override
- public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
- int tStart0 = accessor0.getTupleStartOffset(tIndex0);
- int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
+ int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
- int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
- int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
- int fLen0 = fEnd0 - fStart0;
+ @Override
+ public ITuplePairComparator createTuplePairComparator(
+ IHyracksTaskContext ctx) {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0,
+ pos1);
+ }
+ }
- int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
- int fLen1 = fEnd1 - fStart1;
+ static class JoinComparator implements ITuplePairComparator {
- int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
- .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
- }
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
+
+ public JoinComparator(IBinaryComparator bComparator, int field0,
+ int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0,
+ IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
+ + fStartOffset0, fLen0, accessor1.getBuffer().array(),
+ fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+ }
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f2b56fa..974ef3b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -255,8 +255,7 @@
}
outputAppender.reset(outputFrame, true);
-
- writer.open();
+ //writer.open();
if (tPointers == null) {
// Not sorted