Group by operators to be checked
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2719 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java b/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
new file mode 100644
index 0000000..50060d2
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -0,0 +1,207 @@
+package edu.uci.ics.genomix.core.jobgen.clusterconfig;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class ClusterConfig {
+
+ private static String[] NCs;
+ private static String storePropertiesPath = "conf/stores.properties";
+ private static String clusterPropertiesPath = "conf/cluster.properties";
+ private static Properties clusterProperties = new Properties();
+ private static Map<String, List<String>> ipToNcMapping;
+ private static String[] stores;
+
+ /**
+ * let tests set config path to be whatever
+ *
+ * @param propertiesPath
+ * stores properties file path
+ */
+ public static void setStorePath(String storePropertiesPath) throws HyracksException {
+ ClusterConfig.storePropertiesPath = storePropertiesPath;
+ }
+
+ public static void setClusterPropertiesPath(String clusterPropertiesPath) throws HyracksException {
+ ClusterConfig.clusterPropertiesPath = clusterPropertiesPath;
+ }
+
+ /**
+ * get NC names running on one IP address
+ *
+ * @param ipAddress
+ * @return
+ * @throws HyracksDataException
+ */
+ public static List<String> getNCNames(String ipAddress) throws HyracksException {
+ return ipToNcMapping.get(ipAddress);
+ }
+
+ /**
+ * get file split provider
+ *
+ * @param jobId
+ * @return
+ * @throws HyracksDataException
+ */
+ public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {
+ FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];
+ int i = 0;
+ for (String nc : NCs) {
+ for (String st : stores) {
+ FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId
+ + File.separator + indexName);
+ fileSplits[i++] = split;
+ }
+ }
+ return new ConstantFileSplitProvider(fileSplits);
+ }
+
+ private static void loadStores() throws HyracksException {
+ Properties properties = new Properties();
+ try {
+ properties.load(new FileInputStream(storePropertiesPath));
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ String store = properties.getProperty("store");
+ stores = store.split(",");
+ }
+
+ private static void loadClusterProperties() throws HyracksException {
+ try {
+ clusterProperties.load(new FileInputStream(clusterPropertiesPath));
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static int getFrameSize() {
+ return Integer.parseInt(clusterProperties.getProperty("FRAME_SIZE"));
+ }
+
+ /**
+ * set location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
+ List<InputSplit> splits) throws HyracksException {
+ int count = splits.size();
+ String[] locations = new String[splits.size()];
+ Random random = new Random(System.currentTimeMillis());
+ for (int i = 0; i < splits.size(); i++) {
+ try {
+ String[] loc = splits.get(i).getLocations();
+ Collections.shuffle(Arrays.asList(loc), random);
+ if (loc.length > 0) {
+ InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
+ for (InetAddress ip : allIps) {
+ if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+ List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
+ int pos = random.nextInt(ncs.size());
+ locations[i] = ncs.get(pos);
+ } else {
+ int pos = random.nextInt(NCs.length);
+ locations[i] = NCs[pos];
+ }
+ }
+ } else {
+ int pos = random.nextInt(NCs.length);
+ locations[i] = NCs[pos];
+ }
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ } catch (InterruptedException e) {
+ throw new HyracksException(e);
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+ }
+
+ /**
+ * set location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+ throws HyracksException {
+ int count = 0;
+ String[] locations = new String[NCs.length * stores.length];
+ for (String nc : NCs) {
+ for (int i = 0; i < stores.length; i++) {
+ locations[count] = nc;
+ count++;
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+ }
+
+ /**
+ * set location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
+ int count = NCs.length * stores.length;
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+ }
+
+ public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ NCs = new String[ncNameToNcInfos.size()];
+ ipToNcMapping = new HashMap<String, List<String>>();
+ int i = 0;
+ for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+ String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+ .getHostAddress();
+ List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+ if (matchedNCs == null) {
+ matchedNCs = new ArrayList<String>();
+ ipToNcMapping.put(ipAddr, matchedNCs);
+ }
+ matchedNCs.add(entry.getKey());
+ NCs[i] = entry.getKey();
+ i++;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+
+ loadClusterProperties();
+ loadStores();
+ }
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
new file mode 100644
index 0000000..9092517
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
@@ -0,0 +1,42 @@
+
+package edu.uci.ics.genomix.data.std.accessors;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+
+public class LongHashFunctionFamily implements IBinaryHashFunctionFamily {
+ public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();
+
+ private static final long serialVersionUID = 1L;
+
+ static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877, 3, 29 };
+
+ private LongHashFunctionFamily() {
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction(int seed) {
+ final int coefficient = primeCoefficents[seed % primeCoefficents.length];
+ final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];
+
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int h = 0;
+ int utflen = UTF8StringPointable.getUTFLength(bytes, offset);
+ int sStart = offset + 2;
+ int c = 0;
+
+ while (c < utflen) {
+ char ch = UTF8StringPointable.charAt(bytes, sStart + c);
+ h = (coefficient * h + ch) % r;
+ c += UTF8StringPointable.charSize(bytes, sStart + c);
+ }
+ return h;
+ }
+ };
+ }
+}
+
+
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
new file mode 100644
index 0000000..dd4ca25
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
@@ -0,0 +1,61 @@
+package edu.uci.ics.genomix.data.std.accessors;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
+ private static final long serialVersionUID = 1L;
+
+ private static final int C1 = 0xcc9e2d51;
+ private static final int C2 = 0x1b873593;
+ private static final int C3 = 5;
+ private static final int C4 = 0xe6546b64;
+ private static final int C5 = 0x85ebca6b;
+ private static final int C6 = 0xc2b2ae35;
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction(final int seed) {
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int h = seed;
+ int p = offset;
+ int remain = length;
+ while (remain > 4) {
+ int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8) | (((int) bytes[p + 2]) << 16)
+ | (((int) bytes[p + 3]) << 24);
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ h = Integer.rotateLeft(h, 13);
+ h = h * C3 + C4;
+ p += 4;
+ remain -= 4;
+ }
+ int k = 0;
+ switch (remain) {
+ case 3:
+ k = bytes[p++];
+ case 2:
+ k = (k << 8) | bytes[p++];
+ case 1:
+ k = (k << 8) | bytes[p++];
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ h = Integer.rotateLeft(h, 13);
+ h = h * C3 + C4;
+ }
+ h ^= length;
+ h ^= (h >>> 16);
+ h *= C5;
+ h ^= (h >>> 13);
+ h *= C6;
+ h ^= (h >>> 16);
+ return h;
+ }
+ };
+ }
+}
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..78c37b7
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.genomix.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
+}