Asterix NCs Fault Tolerance

This change includes the following:
- Adapt replication to unique partitions storage.
- Implement auto failover for failing NCs.
- Implement auto failover for metadata node.
- Fix for ASTERIXDB-1251 using proper error message.
- Basic replication test cases using vagrant virtual cluster for:
   1. LSM bulkload components replication.
   2. LSM Memory components replication and recovery.
   3. Metadata node takeover.
These test cases will be part of the cluster test profile.

Change-Id: Ice26d980912a315fcb3efdd571d6ce88717cfea4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/573
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3386252..975180b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.api;
 
 import java.io.IOException;
+import java.rmi.RemoteException;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -89,4 +90,17 @@
     public IReplicationChannel getReplicationChannel();
 
     public void initializeResourceIdFactory() throws HyracksDataException;
+
+    /**
+     * Exports the metadata node to the metadata RMI port.
+     * @throws RemoteException
+     */
+    public void exportMetadataNodeStub() throws RemoteException;
+
+    /**
+     * Initializes the metadata node and bootstraps the metadata.
+     * @param newUniverse
+     * @throws Exception
+     */
+    public void initializeMetadata(boolean newUniverse) throws Exception;
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
new file mode 100644
index 0000000..48b1e73
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+
+public interface IGlobalRecoveryMaanger extends IClusterEventsSubscriber {
+
+    /**
+     * Starts the global recovery process if the cluster state changed to ACTIVE.
+     */
+    public void startGlobalRecovery();
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 8e2c4e7..473a163 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -65,4 +65,8 @@
     public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
         return accessor.getClusterPartitions();
     }
+
+    public Map<String, String> getTransactionLogDirs() {
+        return accessor.getTransactionLogDirs();
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 1ef7e3e..fa5b503 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -36,7 +36,6 @@
     private static int REPLICATION_TIME_OUT_DEFAULT = 15;
 
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
-    private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
 
@@ -102,8 +101,8 @@
             }
 
             if (nodeIndex == -1) {
-                LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
-                        + " in cluster configurations");
+                LOGGER.log(Level.WARNING,
+                        "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
                 return null;
             }
 
@@ -179,13 +178,6 @@
         return replicaIds;
     }
 
-    public String getReplicationStore() {
-        if (cluster != null) {
-            return cluster.getDataReplication().getReplicationStore();
-        }
-        return REPLICATION_STORE_DEFAULT;
-    }
-
     public int getReplicationFactor() {
         if (cluster != null) {
             if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
@@ -202,5 +194,4 @@
         }
         return REPLICATION_TIME_OUT_DEFAULT;
     }
-
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 8dc3efe..a5cd72b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.common.IStorageManagerInterface;
@@ -48,4 +49,6 @@
      * @return ICCApplicationContext implementation instance
      */
     public ICCApplicationContext getCCApplicationContext();
+
+    public IGlobalRecoveryMaanger getGlobalRecoveryManager();
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..78b86a8
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..78f7429
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+
+    public TakeoverMetadataNodeResponseMessage(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..abfa7d2
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String failedNode;
+    private final long requestId;
+    private final String nodeId;
+
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
+            Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.failedNode = failedNode;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    public String getFailedNode() {
+        return failedNode;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Request ID: " + requestId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Failed Node: " + failedNode);
+        sb.append(" Partitions: ");
+        for (Integer partitionId : partitions) {
+            sb.append(partitionId + ",");
+        }
+        //remove last comma
+        sb.charAt(sb.length() - 1);
+        return sb.toString();
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..86eb3cb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String nodeId;
+    private final long requestId;
+
+    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 61ab7cd..57a0dae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -26,7 +26,11 @@
         RESOURCE_ID_REQUEST,
         RESOURCE_ID_RESPONSE,
         REPORT_MAX_RESOURCE_ID_REQUEST,
-        REPORT_MAX_RESOURCE_ID_RESPONSE
+        REPORT_MAX_RESOURCE_ID_RESPONSE,
+        TAKEOVER_PARTITIONS_REQUEST,
+        TAKEOVER_PARTITIONS_RESPONSE,
+        TAKEOVER_METADATA_NODE_REQUEST,
+        TAKEOVER_METADATA_NODE_RESPONSE
     }
 
     public abstract ApplicationMessageType getMessageType();
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
new file mode 100644
index 0000000..7dafbd5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface ICCMessageBroker extends IMessageBroker {
+
+    /**
+     * Sends the passed message to the specified {@code nodeId}
+     * @param msg
+     * @param nodeId
+     * @throws Exception
+     */
+    public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception;
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 63d29a0..ecc9494 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -18,8 +18,24 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+
 public interface IRemoteRecoveryManager {
 
+    /**
+     * Attempts to perform the remote recovery process from an active remote replica.
+     */
     public void performRemoteRecovery();
 
+    /**
+     * Performs the partitions takeover process from the {@code failedNode}
+     * @param failedNode
+     * @param partitions
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index c796f37..f13d300 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -22,10 +22,15 @@
 
 public interface IReplicaResourcesManager {
 
-    public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
-
-    public String getLocalStorageFolder();
-
+    /**
+     * @param remoteNodes
+     * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
+     */
     public long getMinRemoteLSN(Set<String> remoteNodes);
 
+    /**
+     * @param partitions
+     * @return the minimum LSN of all indexes that belong to {@code partitions}.
+     */
+    public long getPartitionsMinLSN(Integer[] partitions);
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 9ea9957..a2b7a82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -111,4 +111,14 @@
      * @throws HyracksDataException
      */
     public long getLocalMinFirstLSN() throws HyracksDataException;
+
+    /**
+     * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+     * @param partitions
+     * @param lowWaterMarkLSN
+     * @param failedNode
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index acfb9d5..48e42bd 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -56,6 +56,14 @@
     }
 
     public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
-        return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+        return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+    }
+
+    public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
+        return dataverseName + File.separator + fullIndexName;
+    }
+
+    private static String prepareFullIndexName(String datasetName, String idxName) {
+        return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
     }
 }
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 872c959..e0605f0 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -47,7 +47,7 @@
 	<xs:element name="enabled" type="xs:boolean" />
 	<xs:element name="replication_port" type="xs:integer" />
 	<xs:element name="replication_factor" type="xs:integer" />
-	<xs:element name="replication_store" type="xs:string" />
+	<xs:element name="auto_failover" type="xs:boolean" />
 	<xs:element name="replication_time_out" type="xs:integer" />
 
 	<!-- definition of complex elements -->
@@ -82,7 +82,7 @@
 				<xs:element ref="cl:enabled" />
 				<xs:element ref="cl:replication_port" />
 				<xs:element ref="cl:replication_factor" />
-				<xs:element ref="cl:replication_store" />
+				<xs:element ref="cl:auto_failover" />
 				<xs:element ref="cl:replication_time_out" />
 			</xs:sequence>
 		</xs:complexType>
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 447e96d..f54db63 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -24,12 +24,14 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -54,6 +56,7 @@
 import org.apache.commons.httpclient.methods.StringRequestEntity;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.json.JSONObject;
 
 public class TestExecutor {
@@ -361,6 +364,14 @@
         return getProcessOutput(p);
     }
 
+    private static String executeVagrantScript(ProcessBuilder pb, String node, String scriptName) throws Exception {
+        pb.command("vagrant", "ssh", node, "--", pb.environment().get("SCRIPT_HOME") + scriptName);
+        Process p = pb.start();
+        p.waitFor();
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+
     private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
         String targetWord = "queries" + File.separator;
         int targetWordSize = targetWord.lastIndexOf(File.separator);
@@ -565,6 +576,22 @@
                             }
                             System.err.println("...but that was expected.");
                             break;
+                        case "vagrant_script":
+                            try {
+                                String[] command = statement.trim().split(" ");
+                                if (command.length != 2) {
+                                    throw new Exception("invalid vagrant script format");
+                                }
+                                String nodeId = command[0];
+                                String scriptName = command[1];
+                                String output = executeVagrantScript(pb, nodeId, scriptName);
+                                if (output.contains("ERROR")) {
+                                    throw new Exception(output);
+                                }
+                            } catch (Exception e) {
+                                throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+                            }
+                            break;
                         default:
                             throw new IllegalArgumentException("No statements of type " + ctx.getType());
                     }