This change integrates a run statement for running Pregelix jobs into the AQL.

Therefore it also provides a new FlushOperator to flush all memory components of a dataset to disc.

Change-Id: I1f97cfdc79943abf035a7342bb777d59af6518e9
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/193
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
index 23553d1..e4dd711 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
@@ -30,7 +30,7 @@
     }
 
     protected List<Statement.Kind> getAllowedStatements() {
-        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE };
+        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE, Kind.RUN };
         return Arrays.asList(statementsArray);
     }
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 25dc341..5fac0f6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.asterix.aql.translator;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
@@ -54,6 +56,7 @@
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
 import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
+import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
 import edu.uci.ics.asterix.aql.expression.Identifier;
 import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
 import edu.uci.ics.asterix.aql.expression.InsertStatement;
@@ -61,13 +64,17 @@
 import edu.uci.ics.asterix.aql.expression.LoadStatement;
 import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
 import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
 import edu.uci.ics.asterix.aql.expression.SetStatement;
 import edu.uci.ics.asterix.aql.expression.TypeDecl;
 import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
+import edu.uci.ics.asterix.aql.expression.VarIdentifier;
+import edu.uci.ics.asterix.aql.expression.VariableExpr;
 import edu.uci.ics.asterix.aql.expression.WriteStatement;
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -115,7 +122,10 @@
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.result.ResultReader;
 import edu.uci.ics.asterix.result.ResultUtils;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
 import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -127,20 +137,30 @@
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import edu.uci.ics.asterix.translator.TypeTranslator;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 /*
@@ -168,7 +188,7 @@
     private final SessionConfig sessionConfig;
     private final OutputFormat pdf;
     private Dataverse activeDefaultDataverse;
-    private List<FunctionDecl> declaredFunctions;
+    private final List<FunctionDecl> declaredFunctions;
 
     public AqlTranslator(List<Statement> aqlStatements, PrintWriter out, SessionConfig pc, APIFramework.OutputFormat pdf)
             throws MetadataException, AsterixException {
@@ -191,7 +211,7 @@
 
     /**
      * Compiles and submits for execution a list of AQL statements.
-     *
+     * 
      * @param hcc
      *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
      * @param hdc
@@ -201,8 +221,8 @@
      * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
      * @throws Exception
      */
-    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
-            ResultDelivery resultDelivery) throws Exception {
+    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
+            throws Exception {
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -334,6 +354,11 @@
                     outputFile = result.second;
                     break;
                 }
+
+                case RUN: {
+                    handleRunStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
             }
         }
     }
@@ -506,8 +531,7 @@
                             compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
                         }
                     } else {
-                        validateCompactionPolicy(compactionPolicy,
-                                                 compactionPolicyProperties, mdTxnCtx, false);
+                        validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
                     }
                     if (filterField != null) {
                         aRecordType.validateFilterField(filterField);
@@ -2346,6 +2370,232 @@
             MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
         }
     }
+    
+    private void handleRunStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws AsterixException, Exception {
+        RunStatement runStmt = (RunStatement) stmt;
+        switch(runStmt.getSystem()) {
+            case "pregel":
+            case "pregelix":
+                handlePregelixStatement(metadataProvider, runStmt, hcc);
+                break;
+            default:
+                throw new AlgebricksException("The system \""+runStmt.getSystem()+"\" specified in your run statement is not supported.");
+        }
+        
+    }
+
+    private void handlePregelixStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws AsterixException, Exception {
+
+        RunStatement pregelixStmt = (RunStatement) stmt;
+        boolean bActiveTxn = true;
+
+        String dataverseNameFrom = getActiveDataverseName(pregelixStmt.getDataverseNameFrom());
+        String dataverseNameTo = getActiveDataverseName(pregelixStmt.getDataverseNameTo());
+        String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
+        String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
+        
+        if(dataverseNameFrom != dataverseNameTo) {
+            throw new AlgebricksException("Pregelix statements across different dataverses are not supported.");
+        }
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.pregelixBegin(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+
+        try {
+
+            // construct input paths
+            Index fromIndex = null;
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom, pregelixStmt
+                    .getDatasetNameFrom().getValue());
+            for (Index ind : indexes) {
+                if (ind.isPrimaryIndex())
+                    fromIndex = ind;
+            }
+
+            if (fromIndex == null) {
+                throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameFrom);
+            }
+
+            IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+                    dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName()).first;
+            StringBuilder fromSplitsPaths = new StringBuilder();
+
+            for (FileSplit f : fromSplits.getFileSplits()) {
+                fromSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+                fromSplitsPaths.append(",");
+            }
+            fromSplitsPaths.setLength(fromSplitsPaths.length() - 1);
+
+            // Construct output paths
+            Index toIndex = null;
+            indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo, pregelixStmt
+                    .getDatasetNameTo().getValue());
+            for (Index ind : indexes) {
+                if (ind.isPrimaryIndex())
+                    toIndex = ind;
+            }
+
+            if (toIndex == null) {
+                throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
+            }
+
+            IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+                    dataverseNameTo, datasetNameTo, toIndex.getIndexName()).first;
+            StringBuilder toSplitsPaths = new StringBuilder();
+
+            for (FileSplit f : toSplits.getFileSplits()) {
+                toSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+                toSplitsPaths.append(",");
+            }
+            toSplitsPaths.setLength(toSplitsPaths.length() - 1);
+
+            try {
+                Dataset toDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
+                DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo),
+                        pregelixStmt.getDatasetNameTo(), true);
+                this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+                IDatasetDetailsDecl idd = new InternalDetailsDecl(new Identifier(toDataset.getDatasetDetails()
+                        .getNodeGroupName()), toIndex.getKeyFieldNames(), false, toDataset.getDatasetDetails()
+                        .getCompactionPolicy(), toDataset.getDatasetDetails().getCompactionPolicyProperties(), null);
+                DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
+                        pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+                        toDataset.getHints(), toDataset.getDatasetType(), idd, false);
+                this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
+            }
+
+            // Flush source dataset
+            flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName());
+
+            // call Pregelix
+            String pregelix_home = System.getenv("PREGELIX_HOME");
+            if (pregelix_home == null) {
+                throw new AlgebricksException("PREGELIX_HOME is not defined!");
+            }
+
+            // construct command
+            ArrayList<String> cmd = new ArrayList<String>();
+            cmd.add("bin/pregelix");
+            cmd.add(pregelixStmt.getParameters().get(0)); // jar
+            cmd.add(pregelixStmt.getParameters().get(1)); // class
+            for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
+                cmd.add(s);
+            }
+            cmd.add("-inputpaths");
+            cmd.add(fromSplitsPaths.toString());
+            cmd.add("-outputpath");
+            cmd.add(toSplitsPaths.toString());
+
+            StringBuilder command = new StringBuilder();
+            for (String s : cmd) {
+                command.append(s);
+                command.append(" ");
+            }
+            LOGGER.info("Running Pregelix Command: " + command.toString());
+
+            ProcessBuilder pb = new ProcessBuilder(cmd);
+            pb.directory(new File(pregelix_home));
+            pb.redirectErrorStream(true);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            Process pr = pb.start();
+
+            int resultState = 0;
+
+            BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+            String line;
+            while ((line = in.readLine()) != null) {
+                System.out.println(line);
+                if (line.contains("job finished")) {
+                    resultState = 1;
+                }
+                if (line.contains("Exception") || line.contains("Error")) {
+
+                    if (line.contains("Connection refused")) {
+                        throw new AlgebricksException(
+                                "The connection to your Pregelix cluster was refused. Is it running? Is the port in the query correct?");
+                    }
+
+                    if (line.contains("Could not find or load main class")) {
+                        throw new AlgebricksException(
+                                "The main class of your Pregelix query was not found. Is the path to your .jar file correct?");
+                    }
+
+                    if (line.contains("ClassNotFoundException")) {
+                        throw new AlgebricksException(
+                                "The vertex class of your Pregelix query was not found. Does it exist? Is the spelling correct?");
+                    }
+
+                    if (line.contains("HyracksException")) {
+                        throw new AlgebricksException(
+                                "Something went wrong executing your Pregelix Job (HyracksException). Check the configuration of STORAGE_BUFFERCACHE_PAGESIZE and STORAGE_MEMORYCOMPONENT_PAGESIZE."
+                                        + "It must match the one of Asterix. You can use managix describe -admin to find out the right configuration. "
+                                        + "Check also if your datatypes in Pregelix and Asterix are matching.");
+                    }
+
+                    throw new AlgebricksException(
+                            "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+                                    + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+                                    + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+                }
+            }
+            pr.waitFor();
+            in.close();
+
+            if (resultState != 1) {
+                throw new AlgebricksException(
+                        "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+                                + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+                                + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+            }
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE
+                    .pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+        }
+    }
+
+    private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
+            throws Exception {
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+        int frameSize = compilerProperties.getFrameSize();
+        JobSpecification spec = new JobSpecification(frameSize);
+
+        RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
+        AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+                new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
+
+        edu.uci.ics.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
+        FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+                MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName).getDatasetId());
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName);
+        AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
+                primaryPartitionConstraint);
+
+        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, false);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        runJob(hcc, spec, true);
+    }
 
     private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
             throws Exception {
@@ -2353,8 +2603,8 @@
         return jobIds[0];
     }
 
-    public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out,
-            boolean waitForCompletion) throws Exception {
+    public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, boolean waitForCompletion)
+            throws Exception {
         JobId[] startedJobIds = new JobId[jobs.length];
         for (int i = 0; i < jobs.length; i++) {
             JobSpecification spec = jobs[i].getJobSpec();
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
new file mode 100644
index 0000000..8d47100f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
@@ -0,0 +1,47 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type TwitterUserType as open {
+        screen-name: string,
+        lang: string,
+        friends_count: int32,
+        statuses_count: int32,
+        name: string,
+        followers_count: int32
+}
+
+create type TweetMessageType as open {
+       tweetid: int64,
+       user: TwitterUserType,
+       sender-location: point?,
+       send-time: datetime,
+       referred-topics: {{ string }},
+       message-text: string,
+       retweeted-from: int64, 
+       forwarded-from: int64
+}
+
+create dataset TwitterMsgs(TweetMessageType)
+    primary key tweetid;
+
+create dataset TwitterUsers(TwitterUserType)
+    primary key screen-name;
+
+    create type TMEdge as open {
+        tweetid: int64,
+        value: float?
+    }
+
+    create type TMGraph as open {
+        tweetid: int64,
+        rank-value: double?,
+        populated-by: {{TMEdge}}
+    }
+
+
+create dataset MyInputGraph(TMGraph)
+    primary key tweetid;
+
+create dataset MyOutputGraph(TMGraph)
+    primary key tweetid;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
new file mode 100644
index 0000000..5c8272f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
@@ -0,0 +1,34 @@
+use dataverse Pregelix;
+
+insert into dataset TwitterUsers (
+{{
+{"screen-name":"NathanGiesen@211","lang":"en","friends_count":18,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},
+{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},
+{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},
+{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136}
+}})
+
+insert into dataset TwitterMsgs (
+{{
+    {"tweetid":1,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("47.44,80.65"),"send-time":datetime("2008-04-26T10:10:00"),"referred-topics":{{"t-mobile","customization"}},"message-text":" love t-mobile its customization is good:)", "retweeted-from":2, "forwarded-from":3},
+    {"tweetid":2,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("32.84,67.14"),"send-time":datetime("2010-05-13T10:10:00"),"referred-topics":{{"verizon","shortcut-menu"}},"message-text":" like verizon its shortcut-menu is awesome:)", "retweeted-from":7, "forwarded-from":1},
+    {"tweetid":3,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("29.72,75.8"),"send-time":datetime("2006-11-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" like motorola the speed is good:)", "retweeted-from":8, "forwarded-from":6},
+    {"tweetid":4,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("39.28,70.48"),"send-time":datetime("2011-12-26T10:10:00"),"referred-topics":{{"sprint","voice-command"}},"message-text":" like sprint the voice-command is mind-blowing:)", "retweeted-from":1, "forwarded-from":3},
+    {"tweetid":5,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("40.09,92.69"),"send-time":datetime("2006-08-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" can't stand motorola its speed is terrible:(", "retweeted-from":8, "forwarded-from":6},
+    {"tweetid":6,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("47.51,83.99"),"send-time":datetime("2010-05-07T10:10:00"),"referred-topics":{{"iphone","voice-clarity"}},"message-text":" like iphone the voice-clarity is good:)", "retweeted-from":4, "forwarded-from":5},
+    {"tweetid":7,"user":{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136},"sender-location":point("36.21,72.6"),"send-time":datetime("2011-08-25T10:10:00"),"referred-topics":{{"samsung","platform"}},"message-text":" like samsung the platform is good", "retweeted-from":2, "forwarded-from":7},
+    {"tweetid":8,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("46.05,93.34"),"send-time":datetime("2005-10-14T10:10:00"),"referred-topics":{{"t-mobile","shortcut-menu"}},"message-text":" like t-mobile the shortcut-menu is awesome:)", "retweeted-from":3, "forwarded-from":7},
+    {"tweetid":9,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("36.86,74.62"),"send-time":datetime("2012-07-21T10:10:00"),"referred-topics":{{"verizon","voicemail-service"}},"message-text":" love verizon its voicemail-service is awesome", "retweeted-from":6, "forwarded-from":2},
+    {"tweetid":10,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("29.15,76.53"),"send-time":datetime("2008-01-26T10:10:00"),"referred-topics":{{"verizon","voice-clarity"}},"message-text":" hate verizon its voice-clarity is OMG:(", "retweeted-from":4, "forwarded-from":5},
+    {"tweetid":11,"user":{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},"sender-location":point("37.59,68.42"),"send-time":datetime("2008-03-09T10:10:00"),"referred-topics":{{"iphone","platform"}},"message-text":" can't stand iphone its platform is terrible", "retweeted-from":6, "forwarded-from":3},
+    {"tweetid":12,"user":{"screen-name":"OliJackson_512","lang":"en","friends_count":445,"statuses_count":164,"name":"Oli Jackson","followers_count":22649},"sender-location":point("24.82,94.63"),"send-time":datetime("2010-02-13T10:10:00"),"referred-topics":{{"samsung","voice-command"}},"message-text":" like samsung the voice-command is amazing:)", "retweeted-from":6, "forwarded-from":5}
+}})
+
+insert into dataset MyInputGraph
+for $tm in dataset TwitterMsgs
+let $links:={{ {"tweetid":$tm.retweeted-from}, {"tweetid":$tm.forwarded-from}}}
+return {
+      "tweetid": $tm.tweetid,
+      "rank-value": 0.0,
+      "populated-by": $links
+}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
new file mode 100644
index 0000000..78fb02e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.PageRankVertex"
+ "-ip 10.0.2.15 -port 3199 -vnum 17")
+from dataset MyInputGraph 
+to dataset MyOutputGraph;
+
+for $n in dataset MyOutputGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
new file mode 100644
index 0000000..1439cc3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+    destVertexId: int64,
+    value: float?
+}
+create type NodeType as open {
+   id: int64,
+   value: int64?,
+   edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
new file mode 100644
index 0000000..5537960
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
@@ -0,0 +1,27 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+    {"id":0,"value":0,"edges":{{ {"destVertexId":1}}}},
+{"id":1,"value":1,"edges":{{ {"destVertexId":1},{"destVertexId":2}}}},
+{"id":2,"value":2,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3}}}},
+{"id":3,"value":3,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4}}}},
+{"id":4,"value":4,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5}}}},
+{"id":5,"value":5,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6}}}},
+{"id":6,"value":6,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7}}}},
+{"id":7,"value":7,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8}}}},
+{"id":8,"value":8,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9}}}},
+{"id":9,"value":9,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9},{"destVertexId":0}}}},
+{"id":10,"value":10,"edges":{{ {"destVertexId":11}}}},
+{"id":11,"value":11,"edges":{{ {"destVertexId":11},{"destVertexId":12}}}},
+{"id":12,"value":12,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13}}}},
+{"id":13,"value":13,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14}}}},
+{"id":14,"value":14,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15}}}},
+{"id":15,"value":15,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16}}}},
+{"id":16,"value":16,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17}}}},
+{"id":17,"value":17,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18}}}},
+{"id":18,"value":18,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19}}}},
+{"id":19,"value":19,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19},   {"destVertexId":10}}}}
+}}
+
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
new file mode 100644
index 0000000..77c5d7e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ConnectedComponentsVertex"
+ "-ip 10.0.2.15 -port 3199")
+from dataset InputGraph 
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
new file mode 100644
index 0000000..9642ce3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+    destVertexId: int64,
+    cost: float?
+}
+create type NodeType as open {
+    id: int64,
+    value: double?,
+    edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
new file mode 100644
index 0000000..f328614
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
@@ -0,0 +1,16 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+{"id":0,  "edges":{{ {"destVertexId":1, "cost":1.0f}}}},
+{"id":1,  "edges":{{ {"destVertexId":3, "cost":4.0f},{"destVertexId":2, "cost":3.0f}}}},
+{"id":2,  "edges":{{ {"destVertexId":4, "cost":5.0f},{"destVertexId":5, "cost":23.0f}}}},
+{"id":3,  "edges":{{ {"destVertexId":2, "cost":1.0f},{"destVertexId":8, "cost":13.0f}}}},
+{"id":4,  "edges":{{ {"destVertexId":1, "cost":5.0f},{"destVertexId":2, "cost":8.0f},{"destVertexId":3, "cost":23.0f},{"destVertexId":4, "cost":12.0f}}}},
+{"id":5,  "edges":{{ {"destVertexId":6, "cost":12.0f},{"destVertexId":7, "cost":17.0f}}}},
+{"id":6,  "edges":{{ {"destVertexId":1, "cost":12.0f},{"destVertexId":2, "cost":1.0f}}}},
+{"id":7,  "edges":{{ {"destVertexId":9, "cost":100.0f}}}},
+{"id":8,  "edges":{{ {"destVertexId":4, "cost":11.0f}}}},
+{"id":9,  "edges":{{ {"destVertexId":1, "cost":16.0f},{"destVertexId":2, "cost":9.0f}}}}
+}}
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
new file mode 100644
index 0000000..0bb4229
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ShortestPathsVertex"
+ "-ip 10.0.2.15 -port 3199 -source-vertex 0")
+from dataset InputGraph
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
new file mode 100644
index 0000000..4b6f605
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
@@ -0,0 +1,13 @@
+[ { "tweetid": 1i64, "rank-value": 0.07349647505081802d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 2i64, "rank-value": 0.10497306443795257d, "populated-by": {{ { "tweetid": 7i64, "value": 0.0f }, { "tweetid": 1i64, "value": 0.0f } }} }
+, { "tweetid": 3i64, "rank-value": 0.09458886502271134d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 4i64, "rank-value": 0.04783722202788025d, "populated-by": {{ { "tweetid": 1i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 5i64, "rank-value": 0.051587222027880256d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 6i64, "rank-value": 0.0822549135142923d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 7i64, "rank-value": 0.14484555969829038d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 8i64, "rank-value": 0.0710049135142923d, "populated-by": {{ { "tweetid": 3i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 9i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 2i64, "value": 0.0f } }} }
+, { "tweetid": 10i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 11i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 12i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
new file mode 100644
index 0000000..82091fb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
@@ -0,0 +1,21 @@
+[ { "id": 0i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f } }} }
+, { "id": 1i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f } }} }
+, { "id": 2i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f } }} }
+, { "id": 3i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f } }} }
+, { "id": 4i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f } }} }
+, { "id": 5i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f } }} }
+, { "id": 6i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f } }} }
+, { "id": 7i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f } }} }
+, { "id": 8i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f } }} }
+, { "id": 9i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f }, { "destVertexId": 0i64, "value": 0.0f } }} }
+, { "id": 10i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f } }} }
+, { "id": 11i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f } }} }
+, { "id": 12i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f } }} }
+, { "id": 13i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f } }} }
+, { "id": 14i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f } }} }
+, { "id": 15i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f } }} }
+, { "id": 16i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f } }} }
+, { "id": 17i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f } }} }
+, { "id": 18i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f } }} }
+, { "id": 19i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f }, { "destVertexId": 10i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
new file mode 100644
index 0000000..38eeba3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
@@ -0,0 +1,11 @@
+[ { "id": 0i64, "value": 0.0d, "edges": {{ { "destVertexId": 1i64, "cost": 1.0f } }} }
+, { "id": 1i64, "value": 1.0d, "edges": {{ { "destVertexId": 3i64, "cost": 4.0f }, { "destVertexId": 2i64, "cost": 3.0f } }} }
+, { "id": 2i64, "value": 4.0d, "edges": {{ { "destVertexId": 4i64, "cost": 5.0f }, { "destVertexId": 5i64, "cost": 23.0f } }} }
+, { "id": 3i64, "value": 5.0d, "edges": {{ { "destVertexId": 2i64, "cost": 1.0f }, { "destVertexId": 8i64, "cost": 13.0f } }} }
+, { "id": 4i64, "value": 9.0d, "edges": {{ { "destVertexId": 1i64, "cost": 5.0f }, { "destVertexId": 2i64, "cost": 8.0f }, { "destVertexId": 3i64, "cost": 23.0f }, { "destVertexId": 4i64, "cost": 12.0f } }} }
+, { "id": 5i64, "value": 27.0d, "edges": {{ { "destVertexId": 6i64, "cost": 12.0f }, { "destVertexId": 7i64, "cost": 17.0f } }} }
+, { "id": 6i64, "value": 39.0d, "edges": {{ { "destVertexId": 1i64, "cost": 12.0f }, { "destVertexId": 2i64, "cost": 1.0f } }} }
+, { "id": 7i64, "value": 44.0d, "edges": {{ { "destVertexId": 9i64, "cost": 100.0f } }} }
+, { "id": 8i64, "value": 18.0d, "edges": {{ { "destVertexId": 4i64, "cost": 11.0f } }} }
+, { "id": 9i64, "value": 144.0d, "edges": {{ { "destVertexId": 1i64, "cost": 16.0f }, { "destVertexId": 2i64, "cost": 9.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index c3b097a..baf330c 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2346,6 +2346,21 @@
             </compilation-unit>
         </test-case>
     </test-group>
+    <!-- <test-group name="graph">
+        <test-case FilePath="graph">
+            <compilation-unit name="pregel-q01">
+                <output-dir compare="Text">pregel-q01</output-dir>
+            </compilation-unit>
+            <test-case FilePath="graph">
+            <compilation-unit name="pregel-q02">
+                <output-dir compare="Text">pregel-q02</output-dir>
+            </compilation-unit>
+            <test-case FilePath="graph">
+            <compilation-unit name="pregel-q03">
+                <output-dir compare="Text">pregel-q03</output-dir>
+            </compilation-unit>
+        </test-case>
+    </test-group> -->
     <test-group name="index-join">
         <test-case FilePath="index-join">
             <compilation-unit name="btree-primary-equi-join">
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
index 6bdca80..abac9df 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
@@ -44,7 +44,8 @@
         CREATE_FUNCTION,
         FUNCTION_DROP,
         COMPACT, 
-        EXTERNAL_DATASET_REFRESH
+        EXTERNAL_DATASET_REFRESH,
+        RUN
     }
 
     public abstract Kind getKind();
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/RunStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/RunStatement.java
new file mode 100644
index 0000000..200a620
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/RunStatement.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class RunStatement implements Statement {
+
+    private String system;
+    private List<String> parameters;
+    private final Identifier dataverseNameFrom;
+    private final Identifier datasetNameFrom;
+    private final Identifier dataverseNameTo;
+    private final Identifier datasetNameTo;
+
+    public RunStatement(String system, List<String> parameters, Identifier dataverseNameFrom, Identifier datasetNameFrom, Identifier dataverseNameTo, Identifier datasetNameTo) {
+        this.system = system;
+        this.parameters = parameters;
+        this.datasetNameFrom = datasetNameFrom;
+        this.dataverseNameFrom = dataverseNameFrom;
+        this.datasetNameTo = datasetNameTo;
+        this.dataverseNameTo = dataverseNameTo;
+    }
+
+    public String getSystem() {
+        return system;
+    }
+
+    public List<String> getParameters() {
+        return parameters;
+    }
+
+    public Identifier getDataverseNameFrom() {
+        return dataverseNameFrom;
+    }
+
+    public Identifier getDatasetNameFrom() {
+        return datasetNameFrom;
+    }
+
+    public Identifier getDataverseNameTo() {
+        return dataverseNameTo;
+    }
+
+    public Identifier getDatasetNameTo() {
+        return datasetNameTo;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+        visitor.visit(this, arg);
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.RUN;
+    }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
index ad8c791..21a2388 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
@@ -61,6 +61,7 @@
 import edu.uci.ics.asterix.aql.expression.OrderbyClause;
 import edu.uci.ics.asterix.aql.expression.OrderbyClause.OrderModifier;
 import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
 import edu.uci.ics.asterix.aql.expression.QuantifiedPair;
 import edu.uci.ics.asterix.aql.expression.Query;
@@ -319,7 +320,7 @@
     public void visit(FieldAccessor fa, Integer step) throws AsterixException {
         out.println(skip(step) + "FieldAccessor [");
         fa.getExpr().accept(this, step + 1);
-        out.println(skip(step + 1) + "Field=" + ((FieldAccessor) fa).getIdent().getValue());
+        out.println(skip(step + 1) + "Field=" + fa.getIdent().getValue());
         out.println(skip(step) + "]");
 
     }
@@ -557,4 +558,10 @@
 
     }
 
+    @Override
+    public void visit(RunStatement stmt, Integer arg) throws AsterixException {
+        // TODO Auto-generated method stub
+
+    }
+
 }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
index ce0e121..f04c3a0 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
@@ -15,8 +15,8 @@
 package edu.uci.ics.asterix.aql.expression.visitor;
 
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
@@ -27,9 +27,9 @@
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
 import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
-import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
+import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.ForClause;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -49,6 +49,7 @@
 import edu.uci.ics.asterix.aql.expression.OperatorExpr;
 import edu.uci.ics.asterix.aql.expression.OrderbyClause;
 import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
 import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RecordConstructor;
@@ -170,4 +171,6 @@
     void visit(FunctionDropStatement fds, T arg) throws AsterixException;
 
     void visit(CompactStatement fds, T arg) throws AsterixException;
+
+    void visit(RunStatement stmt, T arg) throws AsterixException;
 }
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 826a27b..2cd38c6 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -213,6 +213,7 @@
     | stmt = CompactStatement()
     | stmt = Query()
     | stmt = RefreshExternalDatasetStatement()
+    | stmt = RunStatement()
   )
   {
     return stmt;
@@ -398,6 +399,27 @@
 	}
 }
 
+RunStatement RunStatement() throws ParseException:
+{
+  String system = null;
+  String tmp;
+  ArrayList<String> parameters = new  ArrayList<String>();
+  Pair<Identifier,Identifier> nameComponentsFrom = null;
+  Pair<Identifier,Identifier> nameComponentsTo = null;
+}
+{
+  "run" system = Identifier()<LEFTPAREN> ( tmp = Identifier() [<COMMA>]
+    {
+      parameters.add(tmp);
+    }
+  )*<RIGHTPAREN>
+  <FROM> <DATASET> nameComponentsFrom  = QualifiedName()
+  "to" <DATASET> nameComponentsTo  = QualifiedName() 
+    {
+      return new RunStatement(system, parameters, nameComponentsFrom.first, nameComponentsFrom.second, nameComponentsTo.first, nameComponentsTo.second);
+    }
+}
+
 CreateIndexStatement IndexSpecification() throws ParseException:
 {
   CreateIndexStatement cis = new CreateIndexStatement();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 379a7eb..14db1d2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -56,7 +56,7 @@
     private final long capacity;
     private long used;
     private final ILogManager logManager;
-    private LogRecord logRecord;
+    private final LogRecord logRecord;
 
     public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
             ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager) {
@@ -231,7 +231,7 @@
 
     private void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
         if (iInfo.isOpen) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+            ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
         }
@@ -336,7 +336,7 @@
     }
 
     private static class IndexInfo extends Info {
-        private ILSMIndex index;
+        private final ILSMIndex index;
 
         public IndexInfo(ILSMIndex index) {
             this.index = index;
@@ -357,11 +357,13 @@
             this.isExternal = isExternal;
         }
 
+        @Override
         public void touch() {
             super.touch();
             lastAccess = System.currentTimeMillis();
         }
 
+        @Override
         public void untouch() {
             super.untouch();
             lastAccess = System.currentTimeMillis();
@@ -409,6 +411,7 @@
 
         }
 
+        @Override
         public String toString() {
             return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
                     + ", lastAccess: " + lastAccess + "}";
@@ -426,6 +429,13 @@
         }
     }
 
+    public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
+        DatasetInfo datasetInfo = datasetInfos.get(datasetId);
+        if (datasetInfo != null) {
+            flushDatasetOpenIndexes(datasetInfo, asyncFlush);
+        }
+    }
+
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
 
         List<DatasetInfo> laggingDatasets = new ArrayList<DatasetInfo>();
@@ -433,7 +443,7 @@
         //find dataset with min lsn < targetLSN
         for (DatasetInfo dsInfo : datasetInfos.values()) {
             for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) iInfo.index)
+                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
                         .getIOOperationCallback();
                 if (!((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
                     firstLSN = ioCallback.getFirstLSN();
@@ -479,8 +489,8 @@
         if (asyncFlush) {
 
             for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(
-                        NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+                ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+                        NoOpOperationCallback.INSTANCE);
                 accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
             }
         } else {
@@ -541,6 +551,7 @@
         datasetInfos.clear();
     }
 
+    @Override
     public void dumpState(OutputStream outputStream) throws IOException {
         StringBuilder sb = new StringBuilder();
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 5f15bf3..744c208 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -91,7 +91,6 @@
         }
 
         if (needsFlush) {
-
             LogRecord logRecord = new LogRecord();
             logRecord.formFlushLogRecord(datasetID, this);
             try {
@@ -107,8 +106,8 @@
         synchronized (this) {
             for (ILSMIndex lsmIndex : datasetLifecycleManager.getDatasetIndexes(datasetID)) {
                 //get resource
-                ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
-                        NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+                ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+                        NoOpOperationCallback.INSTANCE);
 
                 //update resource lsn
                 AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
@@ -121,6 +120,7 @@
         }
     }
 
+    @Override
     public void exclusiveJobCommitted() throws HyracksDataException {
         numActiveOperations.set(0);
         flushIfFull();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
index 02e40b2..436fe4d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
@@ -10,13 +10,13 @@
 public class MetadataLockManager {
 
     public static MetadataLockManager INSTANCE = new MetadataLockManager();
-    private ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
-    private ConcurrentHashMap<String, DatasetLock> datasetsLocks;
-    private ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
-    private ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
-    private ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
-    private ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
-    private ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
+    private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
 
     private MetadataLockManager() {
         dataversesLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
@@ -509,4 +509,25 @@
         releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
         releaseDataverseReadLock(dataverseName);
     }
+
+    public void pregelixBegin(String dataverseName, String datasetFullyQualifiedNameFrom,
+            String datasetFullyQualifiedNameTo) {
+        acquireDataverseReadLock(dataverseName);
+
+        if (datasetFullyQualifiedNameFrom.compareTo(datasetFullyQualifiedNameTo) < 0) {
+            acquireDatasetReadLock(datasetFullyQualifiedNameFrom);
+            acquireDatasetWriteLock(datasetFullyQualifiedNameTo);
+        } else {
+            acquireDatasetWriteLock(datasetFullyQualifiedNameTo);
+            acquireDatasetReadLock(datasetFullyQualifiedNameFrom);
+        }
+    }
+
+    public void pregelixEnd(String dataverseName, String datasetFullyQualifiedNameFrom,
+            String datasetFullyQualifiedNameTo) {
+
+        releaseDatasetReadLock(datasetFullyQualifiedNameFrom);
+        releaseDatasetWriteLock(datasetFullyQualifiedNameTo);
+        releaseDataverseReadLock(dataverseName);
+    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
new file mode 100644
index 0000000..6009ff1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final JobId jobId;
+    private final DatasetId datasetId;
+
+    public FlushDatasetOperatorDescriptor(IOperatorDescriptorRegistry spec, JobId jobId, int datasetId) {
+        super(spec, 1, 0);
+        this.jobId = jobId;
+        this.datasetId = new DatasetId(datasetId);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+            @Override
+            public void open() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                this.close();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                try {
+                    IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                            .getApplicationContext().getApplicationObject();
+                    DatasetLifecycleManager datasetLifeCycleManager = (DatasetLifecycleManager) runtimeCtx
+                            .getIndexLifecycleManager();
+                    ILockManager lockManager = runtimeCtx.getTransactionSubsystem().getLockManager();
+                    ITransactionManager txnManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+                    // get the local transaction
+                    ITransactionContext txnCtx = txnManager.getTransactionContext(jobId, false);
+                    // lock the dataset granule
+                    lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
+                    // flush the dataset synchronously
+                    datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+                } catch (ACIDException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+        };
+    }
+}