Group by operators to be checked

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2719 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java b/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
new file mode 100644
index 0000000..50060d2
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -0,0 +1,207 @@
+package edu.uci.ics.genomix.core.jobgen.clusterconfig;

+

+import java.io.File;

+import java.io.FileInputStream;

+import java.io.IOException;

+import java.net.InetAddress;

+import java.util.ArrayList;

+import java.util.Arrays;

+import java.util.Collections;

+import java.util.HashMap;

+import java.util.List;

+import java.util.Map;

+import java.util.Properties;

+import java.util.Random;

+

+import org.apache.hadoop.mapreduce.InputSplit;

+

+import edu.uci.ics.hyracks.api.client.HyracksConnection;

+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;

+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.exceptions.HyracksException;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;

+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;

+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;

+

+public class ClusterConfig {

+

+    private static String[] NCs;

+    private static String storePropertiesPath = "conf/stores.properties";

+    private static String clusterPropertiesPath = "conf/cluster.properties";

+    private static Properties clusterProperties = new Properties();

+    private static Map<String, List<String>> ipToNcMapping;

+    private static String[] stores;

+

+    /**

+     * let tests set config path to be whatever

+     * 

+     * @param propertiesPath

+     *            stores properties file path

+     */

+    public static void setStorePath(String storePropertiesPath) throws HyracksException {

+        ClusterConfig.storePropertiesPath = storePropertiesPath;

+    }

+

+    public static void setClusterPropertiesPath(String clusterPropertiesPath) throws HyracksException {

+        ClusterConfig.clusterPropertiesPath = clusterPropertiesPath;

+    }

+

+    /**

+     * get NC names running on one IP address

+     * 

+     * @param ipAddress

+     * @return

+     * @throws HyracksDataException

+     */

+    public static List<String> getNCNames(String ipAddress) throws HyracksException {

+        return ipToNcMapping.get(ipAddress);

+    }

+

+    /**

+     * get file split provider

+     * 

+     * @param jobId

+     * @return

+     * @throws HyracksDataException

+     */

+    public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {

+        FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];

+        int i = 0;

+        for (String nc : NCs) {

+            for (String st : stores) {

+                FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId

+                        + File.separator + indexName);

+                fileSplits[i++] = split;

+            }

+        }

+        return new ConstantFileSplitProvider(fileSplits);

+    }

+

+    private static void loadStores() throws HyracksException {

+        Properties properties = new Properties();

+        try {

+            properties.load(new FileInputStream(storePropertiesPath));

+        } catch (IOException e) {

+            throw new HyracksDataException(e);

+        }

+        String store = properties.getProperty("store");

+        stores = store.split(",");

+    }

+

+    private static void loadClusterProperties() throws HyracksException {

+        try {

+            clusterProperties.load(new FileInputStream(clusterPropertiesPath));

+        } catch (IOException e) {

+            throw new HyracksDataException(e);

+        }

+    }

+

+    public static int getFrameSize() {

+        return Integer.parseInt(clusterProperties.getProperty("FRAME_SIZE"));

+    }

+

+    /**

+     * set location constraint

+     * 

+     * @param spec

+     * @param operator

+     * @throws HyracksDataException

+     */

+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,

+            List<InputSplit> splits) throws HyracksException {

+        int count = splits.size();

+        String[] locations = new String[splits.size()];

+        Random random = new Random(System.currentTimeMillis());

+        for (int i = 0; i < splits.size(); i++) {

+            try {

+                String[] loc = splits.get(i).getLocations();

+                Collections.shuffle(Arrays.asList(loc), random);

+                if (loc.length > 0) {

+                    InetAddress[] allIps = InetAddress.getAllByName(loc[0]);

+                    for (InetAddress ip : allIps) {

+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {

+                            List<String> ncs = ipToNcMapping.get(ip.getHostAddress());

+                            int pos = random.nextInt(ncs.size());

+                            locations[i] = ncs.get(pos);

+                        } else {

+                            int pos = random.nextInt(NCs.length);

+                            locations[i] = NCs[pos];

+                        }

+                    }

+                } else {

+                    int pos = random.nextInt(NCs.length);

+                    locations[i] = NCs[pos];

+                }

+            } catch (IOException e) {

+                throw new HyracksException(e);

+            } catch (InterruptedException e) {

+                throw new HyracksException(e);

+            }

+        }

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);

+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);

+    }

+

+    /**

+     * set location constraint

+     * 

+     * @param spec

+     * @param operator

+     * @throws HyracksDataException

+     */

+    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)

+            throws HyracksException {

+        int count = 0;

+        String[] locations = new String[NCs.length * stores.length];

+        for (String nc : NCs) {

+            for (int i = 0; i < stores.length; i++) {

+                locations[count] = nc;

+                count++;

+            }

+        }

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);

+    }

+

+    /**

+     * set location constraint

+     * 

+     * @param spec

+     * @param operator

+     * @throws HyracksDataException

+     */

+    public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {

+        int count = NCs.length * stores.length;

+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);

+    }

+

+    public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {

+        try {

+            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

+            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();

+            NCs = new String[ncNameToNcInfos.size()];

+            ipToNcMapping = new HashMap<String, List<String>>();

+            int i = 0;

+            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {

+                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())

+                        .getHostAddress();

+                List<String> matchedNCs = ipToNcMapping.get(ipAddr);

+                if (matchedNCs == null) {

+                    matchedNCs = new ArrayList<String>();

+                    ipToNcMapping.put(ipAddr, matchedNCs);

+                }

+                matchedNCs.add(entry.getKey());

+                NCs[i] = entry.getKey();

+                i++;

+            }

+        } catch (Exception e) {

+            throw new IllegalStateException(e);

+        }

+

+        loadClusterProperties();

+        loadStores();

+    }

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
new file mode 100644
index 0000000..9092517
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
@@ -0,0 +1,42 @@
+

+package edu.uci.ics.genomix.data.std.accessors;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;

+

+public class LongHashFunctionFamily implements IBinaryHashFunctionFamily {

+    public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();

+

+    private static final long serialVersionUID = 1L;

+

+    static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877, 3, 29 };

+

+    private LongHashFunctionFamily() {

+    }

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction(int seed) {

+        final int coefficient = primeCoefficents[seed % primeCoefficents.length];

+        final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];

+

+        return new IBinaryHashFunction() {

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                int h = 0;

+                int utflen = UTF8StringPointable.getUTFLength(bytes, offset);

+                int sStart = offset + 2;

+                int c = 0;

+

+                while (c < utflen) {

+                    char ch = UTF8StringPointable.charAt(bytes, sStart + c);

+                    h = (coefficient * h + ch) % r;

+                    c += UTF8StringPointable.charSize(bytes, sStart + c);

+                }

+                return h;

+            }

+        };

+    }

+}

+

+

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
new file mode 100644
index 0000000..dd4ca25
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
@@ -0,0 +1,61 @@
+package edu.uci.ics.genomix.data.std.accessors;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+

+public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

+    private static final long serialVersionUID = 1L;

+

+    private static final int C1 = 0xcc9e2d51;

+    private static final int C2 = 0x1b873593;

+    private static final int C3 = 5;

+    private static final int C4 = 0xe6546b64;

+    private static final int C5 = 0x85ebca6b;

+    private static final int C6 = 0xc2b2ae35;

+

+    @Override

+    public IBinaryHashFunction createBinaryHashFunction(final int seed) {

+        return new IBinaryHashFunction() {

+            @Override

+            public int hash(byte[] bytes, int offset, int length) {

+                int h = seed;

+                int p = offset;

+                int remain = length;

+                while (remain > 4) {

+                    int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8) | (((int) bytes[p + 2]) << 16)

+                            | (((int) bytes[p + 3]) << 24);

+                    k *= C1;

+                    k = Integer.rotateLeft(k, 15);

+                    k *= C2;

+                    h ^= k;

+                    h = Integer.rotateLeft(h, 13);

+                    h = h * C3 + C4;

+                    p += 4;

+                    remain -= 4;

+                }

+                int k = 0;

+                switch (remain) {

+                    case 3:

+                        k = bytes[p++];

+                    case 2:

+                        k = (k << 8) | bytes[p++];

+                    case 1:

+                        k = (k << 8) | bytes[p++];

+                        k *= C1;

+                        k = Integer.rotateLeft(k, 15);

+                        k *= C2;

+                        h ^= k;

+                        h = Integer.rotateLeft(h, 13);

+                        h = h * C3 + C4;

+                }

+                h ^= length;

+                h ^= (h >>> 16);

+                h *= C5;

+                h ^= (h >>> 13);

+                h *= C6;

+                h ^= (h >>> 16);

+                return h;

+            }

+        };

+    }

+}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..78c37b7
--- /dev/null
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.genomix.dataflow;

+

+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;

+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

+

+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {

+    private static final long serialVersionUID = 1L;

+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

+

+    @Override

+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,

+            int[] fanouts) {

+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

+            return senderSideMaterializePolicy;

+        } else {

+            return pipeliningPolicy;

+        }

+    }

+}