Fix for issue 257: Local file system adapter requires end-user to use a node controller id for identifying the source file system. (Reviewer: Zach)

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@1244 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/optimizer/OptimizerTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/optimizer/OptimizerTest.java
index 43c6c0c..10cba14 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/optimizer/OptimizerTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/optimizer/OptimizerTest.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.asterix.api.java.AsterixJavaClient;
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
+import edu.uci.ics.asterix.external.util.IdentitiyResolverFactory;
 import edu.uci.ics.asterix.test.base.AsterixTestHelper;
 import edu.uci.ics.asterix.test.common.TestHelper;
 
@@ -56,6 +58,10 @@
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
         AsterixHyracksIntegrationUtil.init();
+        // Set the node resolver to be the identity resolver that expects node names 
+        // to be node controller ids; a valid assumption in test environment. 
+        System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
+                IdentitiyResolverFactory.class.getName());
     }
 
     @AfterClass
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 8eafd02..c705ea6 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -17,6 +17,8 @@
 
 import edu.uci.ics.asterix.api.common.AsterixHyracksIntegrationUtil;
 import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
+import edu.uci.ics.asterix.external.util.IdentitiyResolverFactory;
 import edu.uci.ics.asterix.test.aql.TestsUtils;
 import edu.uci.ics.asterix.testframework.context.TestCaseContext;
 import edu.uci.ics.asterix.testframework.xml.TestCase.CompilationUnit;
@@ -51,8 +53,14 @@
 
         AsterixHyracksIntegrationUtil.init();
 
-        // TODO: Uncomment when hadoop version is upgraded and adapters are ported
+        // TODO: Uncomment when hadoop version is upgraded and adapters are
+        // ported. 
         HDFSCluster.getInstance().setup();
+
+        // Set the node resolver to be the identity resolver that expects node names 
+        // to be node controller ids; a valid assumption in test environment. 
+        System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
+                IdentitiyResolverFactory.class.getName());
     }
 
     @AfterClass
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index e46705d..9f8cedc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -17,8 +17,13 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.util.DNSResolverFactory;
+import edu.uci.ics.asterix.external.util.INodeResolver;
+import edu.uci.ics.asterix.external.util.INodeResolverFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -36,12 +41,17 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected ITupleParserFactory parserFactory;
-    protected ITupleParser parser;
-
+    public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
     public static final String KEY_DELIMITER = "delimiter";
     public static final String KEY_PATH = "path";
 
+    protected ITupleParserFactory parserFactory;
+    protected ITupleParser parser;
+    protected static INodeResolver nodeResolver;
+
+    private static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
+    private static final Logger LOGGER = Logger.getLogger(FileSystemBasedAdapter.class.getName());
+
     public abstract InputStream getInputStream(int partition) throws IOException;
 
     public FileSystemBasedAdapter(IAType atype) {
@@ -118,4 +128,32 @@
         }
 
     }
+
+    protected INodeResolver getNodeResolver() {
+        if (nodeResolver == null) {
+            nodeResolver = initNodeResolver();
+        }
+        return nodeResolver;
+    }
+
+    private static INodeResolver initNodeResolver() {
+        INodeResolver nodeResolver = null;
+        String configuredNodeResolverFactory = System.getProperty(NODE_RESOLVER_FACTORY_PROPERTY);
+        if (configuredNodeResolverFactory != null) {
+            try {
+                nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
+                        .createNodeResolver();
+
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Unable to create node resolver from the configured classname "
+                            + configuredNodeResolverFactory + "\n" + e.getMessage());
+                }
+                nodeResolver = DEFAULT_NODE_RESOLVER;
+            }
+        } else {
+            nodeResolver = DEFAULT_NODE_RESOLVER;
+        }
+        return nodeResolver;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index bcc90c8..ae9b412 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -21,6 +21,7 @@
 import java.io.InputStream;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -60,7 +61,7 @@
         return AdapterType.READ;
     }
 
-    private void configureFileSplits(String[] splits) {
+    private void configureFileSplits(String[] splits)  {
         if (fileSplits == null) {
             fileSplits = new FileSplit[splits.length];
             String nodeName;
@@ -77,10 +78,12 @@
         }
     }
 
-    private void configurePartitionConstraint() {
+    private void configurePartitionConstraint() throws AsterixException {
         String[] locs = new String[fileSplits.length];
+        String location;
         for (int i = 0; i < fileSplits.length; i++) {
-            locs[i] = fileSplits[i].getNodeName();
+            location = getNodeResolver().resolveNode(fileSplits[i].getNodeName());
+            locs[i] = location;
         }
         partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locs);
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java
index 29e4486..8314267 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java
@@ -43,9 +43,7 @@
                 for (LinkedBlockingQueue<IFeedMessage> queue : operatorQueues) {
                     queue.put(feedMessage);
                 }
-            } else {
-                throw new AsterixException("Unable to deliver message. Unknown feed :" + feedId);
-            }
+            } 
         } catch (Exception e) {
             throw new AsterixException(e);
         }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/DNSResolver.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/DNSResolver.java
new file mode 100644
index 0000000..ff6bbdf
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/DNSResolver.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.util;
+
+import java.util.Random;
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
+
+/**
+ * Resolves a value (DNS/IP Address) to the id of a Node Controller running at the location.
+ */
+public class DNSResolver implements INodeResolver {
+
+    private static Random random = new Random();
+
+    @Override
+    public String resolveNode(String value) throws AsterixException {
+        try {
+            String ipAddress = AsterixRuntimeUtil.getIPAddress(value);
+            Set<String> nodeControllers = AsterixRuntimeUtil.getNodeControllersOnIP(ipAddress);
+            if (nodeControllers == null || nodeControllers.isEmpty()) {
+                throw new AsterixException(" No node controllers found at the address: " + value);
+            }
+            String chosenNCId = nodeControllers.toArray(new String[]{})[random
+                    .nextInt(nodeControllers.size())];
+            return chosenNCId;
+        } catch (AsterixException ae) {
+            throw ae;
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/DNSResolverFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/DNSResolverFactory.java
new file mode 100644
index 0000000..6b56601
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/DNSResolverFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.util;
+
+/**
+ * Factory for creating instance of {@link DNSResolver}
+ */
+public class DNSResolverFactory implements INodeResolverFactory {
+
+    private static final INodeResolver INSTANCE = new DNSResolver();
+
+    @Override
+    public INodeResolver createNodeResolver() {
+        return INSTANCE;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/INodeResolver.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/INodeResolver.java
new file mode 100644
index 0000000..d0e8a64
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/INodeResolver.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.util;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+/**
+ * A policy for resolving a name to a node controller id.   
+ *
+ */
+public interface INodeResolver {
+
+    /**
+     * Resolve a passed-in value to a node controller id.
+     * 
+     * @param value
+     *            string to be resolved
+     * @return resolved result (a node controller id)
+     * @throws AsterixException
+     */
+    public String resolveNode(String value) throws AsterixException;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/INodeResolverFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/INodeResolverFactory.java
new file mode 100644
index 0000000..2abde9c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/INodeResolverFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.util;
+
+/**
+ * Factory for creating an instance of INodeResolver
+ * 
+ *  @see INodeResolver
+ */
+public interface INodeResolverFactory {
+
+    /**
+     * Create an instance of {@link INodeResolver}
+     * 
+     * @return an instance of INodeResolver
+     */
+    public INodeResolver createNodeResolver();
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/IdentitiyResolverFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/IdentitiyResolverFactory.java
new file mode 100644
index 0000000..5161203
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/IdentitiyResolverFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.util;
+
+/**
+ * Factory for creating an instance of @see {IdentityResolver}.
+ * Identity resolver simply resolves a value to itself and is useful when value being resolved
+ * is a node controller id.
+ */
+public class IdentitiyResolverFactory implements INodeResolverFactory {
+
+    private static INodeResolver INSTANCE = new IdentityResolver();
+
+    @Override
+    public INodeResolver createNodeResolver() {
+        return INSTANCE;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/IdentityResolver.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/IdentityResolver.java
new file mode 100644
index 0000000..7ff1f9b
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/IdentityResolver.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.util;
+
+/**
+ * Identity resolver simply resolves a value to itself and is useful when value being resolved
+ * is a node controller id.
+ */
+public class IdentityResolver implements INodeResolver {
+
+    @Override
+    public String resolveNode(String value) {
+        return value;
+    }
+
+}