Merge branch 'cheshire-cat'

Change-Id: Ib901e46e1b0e1ce98f4b891ab9fba912ac4101b4
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index fec0b38..9efb6f8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -20,13 +20,13 @@
 
 import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
 import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH;
-import static org.apache.hyracks.api.exceptions.IFormattedException.getError;
 
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -295,14 +295,41 @@
         responseWriter.flush();
     }
 
+    protected boolean isRequestPermittedForWrite(IServletRequest request, IServletResponse response) {
+        if (!isRequestOnLoopback(request)) {
+            rejectForbidden(response);
+            return false;
+        }
+        return true;
+    }
+
+    protected boolean isRequestOnLoopback(IServletRequest request) {
+        if (request.getLocalAddress() != null && request.getRemoteAddress() != null) {
+            InetAddress local = request.getLocalAddress().getAddress();
+            InetAddress remote = request.getRemoteAddress().getAddress();
+            return remote.isLoopbackAddress() && local.isLoopbackAddress();
+        } else {
+            return false;
+        }
+    }
+
+    protected static void rejectForbidden(IServletResponse response) {
+        response.setStatus(HttpResponseStatus.FORBIDDEN);
+        response.writer().write("{ \"error\": \"Forbidden\" }");
+    }
+
     @Override
     protected void post(IServletRequest request, IServletResponse response) {
-        handleModification(request, response, LibraryOperation.UPSERT);
+        if (isRequestPermittedForWrite(request, response)) {
+            handleModification(request, response, LibraryOperation.UPSERT);
+        }
     }
 
     @Override
     protected void delete(IServletRequest request, IServletResponse response) {
-        handleModification(request, response, LibraryOperation.DELETE);
+        if (isRequestPermittedForWrite(request, response)) {
+            handleModification(request, response, LibraryOperation.DELETE);
+        }
     }
 
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index cc4b25f..2891710 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -324,7 +324,8 @@
             requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
         }
         try {
-            List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
+            List<String> responses =
+                    (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout, false);
             stats = formatStats(responses);
             statsTimestamp = System.currentTimeMillis();
             notifySubscribers(statsUpdatedEvent);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 7154814..8c4675c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -406,13 +406,13 @@
                         handleCreateFunctionStatement(metadataProvider, stmt, stmtRewriter, requestParameters);
                         break;
                     case FUNCTION_DROP:
-                        handleFunctionDropStatement(metadataProvider, stmt);
+                        handleFunctionDropStatement(metadataProvider, stmt, requestParameters);
                         break;
                     case CREATE_LIBRARY:
                         handleCreateLibraryStatement(metadataProvider, stmt, hcc, requestParameters);
                         break;
                     case LIBRARY_DROP:
-                        handleLibraryDropStatement(metadataProvider, stmt, hcc);
+                        handleLibraryDropStatement(metadataProvider, stmt, hcc, requestParameters);
                         break;
                     case CREATE_SYNONYM:
                         handleCreateSynonymStatement(metadataProvider, stmt);
@@ -2684,7 +2684,8 @@
         return new Triple<>(paramTypeSignature, depTypeSignature, paramInlineTypeEntity);
     }
 
-    protected void handleFunctionDropStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+    protected void handleFunctionDropStatement(MetadataProvider metadataProvider, Statement stmt,
+            IRequestParameters requestParameters) throws Exception {
         FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
         FunctionSignature signature = stmtDropFunction.getFunctionSignature();
         metadataProvider.validateDatabaseObjectName(signature.getDataverseName(), signature.getName(),
@@ -2693,14 +2694,14 @@
         signature.setDataverseName(dataverseName);
         lockUtil.dropFunctionBegin(lockManager, metadataProvider.getLocks(), dataverseName, signature.getName());
         try {
-            doDropFunction(metadataProvider, stmtDropFunction, signature);
+            doDropFunction(metadataProvider, stmtDropFunction, signature, requestParameters);
         } finally {
             metadataProvider.getLocks().unlock();
         }
     }
 
     protected boolean doDropFunction(MetadataProvider metadataProvider, FunctionDropStatement stmtDropFunction,
-            FunctionSignature signature) throws Exception {
+            FunctionSignature signature, IRequestParameters requestParameters) throws Exception {
         DataverseName dataverseName = signature.getDataverseName();
         SourceLocation sourceLoc = stmtDropFunction.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2990,7 +2991,7 @@
     }
 
     protected void handleLibraryDropStatement(MetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         LibraryDropStatement stmtDropLibrary = (LibraryDropStatement) stmt;
         String libraryName = stmtDropLibrary.getLibraryName();
         metadataProvider.validateDatabaseObjectName(stmtDropLibrary.getDataverseName(), libraryName,
@@ -2998,14 +2999,15 @@
         DataverseName dataverseName = getActiveDataverseName(stmtDropLibrary.getDataverseName());
         lockUtil.dropLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName);
         try {
-            doDropLibrary(metadataProvider, stmtDropLibrary, dataverseName, libraryName, hcc);
+            doDropLibrary(metadataProvider, stmtDropLibrary, dataverseName, libraryName, hcc, requestParameters);
         } finally {
             metadataProvider.getLocks().unlock();
         }
     }
 
     protected boolean doDropLibrary(MetadataProvider metadataProvider, LibraryDropStatement stmtDropLibrary,
-            DataverseName dataverseName, String libraryName, IHyracksClientConnection hcc) throws Exception {
+            DataverseName dataverseName, String libraryName, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
         JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ae50880..e1c39a0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -47,11 +47,9 @@
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.util.ExitUtil;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -95,15 +93,19 @@
             synchronized (this) {
                 if (!recovering) {
                     recovering = true;
-                    /**
+                    /*
                      * Perform recovery on a different thread to avoid deadlocks in
                      * {@link org.apache.asterix.common.cluster.IClusterStateManager}
                      */
                     serviceCtx.getControllerService().getExecutor().submit(() -> {
                         try {
                             recover(appCtx);
-                        } catch (HyracksDataException e) {
-                            LOGGER.log(Level.ERROR, "Global recovery failed. Shutting down...", e);
+                        } catch (Throwable e) {
+                            try {
+                                LOGGER.fatal("Global recovery failed. Shutting down...", e);
+                            } catch (Throwable ignore) {
+                                // ignoring exception trying to log, just do the halt
+                            }
                             ExitUtil.exit(ExitUtil.EC_FAILED_TO_RECOVER);
                         }
                     });
@@ -112,24 +114,20 @@
         }
     }
 
-    protected void recover(ICcApplicationContext appCtx) throws HyracksDataException {
-        try {
-            LOGGER.info("Starting Global Recovery");
-            MetadataManager.INSTANCE.init();
-            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
-                int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
-                performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
-            }
-            mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            recoveryCompleted = true;
-            recovering = false;
-            LOGGER.info("Global Recovery Completed. Refreshing cluster state...");
-            appCtx.getClusterStateManager().refreshState();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+    protected void recover(ICcApplicationContext appCtx) throws Exception {
+        LOGGER.info("Starting Global Recovery");
+        MetadataManager.INSTANCE.init();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
+            int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
+            performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
         }
+        mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        recoveryCompleted = true;
+        recovering = false;
+        LOGGER.info("Global Recovery Completed. Refreshing cluster state...");
+        appCtx.getClusterStateManager().refreshState();
     }
 
     protected void performGlobalStorageCleanup(MetadataTransactionContext mdTxnCtx, int storageGlobalCleanupTimeoutSecs)
@@ -150,7 +148,7 @@
             requests.add(new StorageCleanupRequestMessage(reqId, validDatasetIds));
         }
         messageBroker.sendSyncRequestToNCs(reqId, ncs, requests,
-                TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs));
+                TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs), false);
     }
 
     protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx)
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 1f9ec32..0683207 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -65,18 +65,34 @@
     }
 
     @Override
-    public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+    public boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+        return sendMessage(msg, nodeId, false);
+    }
+
+    @Override
+    public boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+        return sendMessage(msg, nodeId, true);
+    }
+
+    private boolean sendMessage(INcAddressedMessage msg, String nodeId, boolean realTime) throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         if (msg instanceof ICcIdentifiedMessage) {
             ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId());
         }
         if (state != null) {
-            state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+            byte[] payload = JavaSerializationUtils.serialize(msg);
+            if (realTime) {
+                state.getNodeController().sendRealTimeApplicationMessageToNC(payload, null, nodeId);
+            } else {
+                state.getNodeController().sendApplicationMessageToNC(payload, null, nodeId);
+            }
+            return true;
         } else {
             if (LOGGER.isWarnEnabled()) {
                 LOGGER.warn("Couldn't send message to unregistered node (" + nodeId + ")");
             }
+            return false;
         }
     }
 
@@ -87,7 +103,7 @@
 
     @Override
     public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
-            long timeout) throws Exception {
+            long timeout, boolean realTime) throws Exception {
         MutableInt numRequired = new MutableInt(0);
         MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair =
                 MutablePair.of(numRequired, MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED));
@@ -101,7 +117,10 @@
                     if (!(message instanceof ICcIdentifiedMessage)) {
                         throw new IllegalStateException("sync request message not cc identified: " + message);
                     }
-                    sendApplicationMessageToNC(message, nc);
+                    if (!(realTime ? sendRealTimeApplicationMessageToNC(message, nc)
+                            : sendApplicationMessageToNC(message, nc))) {
+                        throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "unable to send sync message to " + nc);
+                    }
                 }
                 long time = System.currentTimeMillis();
                 while (pair.getLeft().getValue() > 0) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 26f6524..1b216ea 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -79,6 +79,16 @@
     }
 
     @Override
+    public void sendRealTimeMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception {
+        ncs.sendRealTimeApplicationMessageToCC(ccId, JavaSerializationUtils.serialize(message), null);
+    }
+
+    @Override
+    public void sendRealTimeMessageToPrimaryCC(ICcAddressedMessage message) throws Exception {
+        sendRealTimeMessageToCC(ncs.getPrimaryCcId(), message);
+    }
+
+    @Override
     public void sendMessageToNC(String nodeId, INcAddressedMessage message) throws Exception {
         IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
         sendMessageToChannel(messagingChannel, message);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index f347b77..28b7fb3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -2471,6 +2471,23 @@
                         + cUnit.getName() + "_qbc.adm");
     }
 
+    protected URI createLocalOnlyEndpointURI(String pathAndQuery) throws URISyntaxException {
+        InetSocketAddress endpoint;
+        if (!ncEndPointsList.isEmpty() && (pathAndQuery.equals(Servlets.QUERY_SERVICE)
+                || pathAndQuery.startsWith(Servlets.getAbsolutePath(Servlets.UDF)))) {
+            int endpointIdx = Math.abs(endpointSelector++ % ncEndPointsList.size());
+            endpoint = ncEndPointsList.get(endpointIdx);
+        } else if (isCcEndPointPath(pathAndQuery)) {
+            int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+            endpoint = endpoints.get(endpointIdx);
+        } else {
+            throw new IllegalArgumentException("Invalid local endpoint format");
+        }
+        URI uri = URI.create("http://" + toHostPort("localhost", endpoint.getPort()) + pathAndQuery);
+        LOGGER.debug("Created endpoint URI: " + uri);
+        return uri;
+    }
+
     protected URI createEndpointURI(String pathAndQuery) throws URISyntaxException {
         InetSocketAddress endpoint;
         if (!ncEndPointsList.isEmpty() && (pathAndQuery.equals(Servlets.QUERY_SERVICE)
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 208686c..e628bc7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -36,7 +36,16 @@
      * @param nodeId
      * @throws Exception
      */
-    void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+    boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+
+    /**
+     * Sends the passed message to the specified {@code nodeId}
+     *
+     * @param msg
+     * @param nodeId
+     * @throws Exception
+     */
+    boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
 
     /**
      * Sends the passed requests to all NCs and wait for the response
@@ -44,10 +53,11 @@
      * @param ncs
      * @param requests
      * @param timeout
+     * @param realTime
      * @throws Exception
      */
     Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
-            long timeout) throws Exception;
+            long timeout, boolean realTime) throws Exception;
 
     /**
      * respond to a sync request
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 6bd58a9..88905fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,7 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
+    void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
 
     /**
      * Sends application message from this NC to the CC.
@@ -37,7 +37,23 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
+    void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
+
+    /**
+     * Sends high-priority application message from this NC to the primary CC.
+     *
+     * @param message
+     * @throws Exception
+     */
+    void sendRealTimeMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
+
+    /**
+     * Sends high-priority application message from this NC to the CC.
+     *
+     * @param message
+     * @throws Exception
+     */
+    void sendRealTimeMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
 
     /**
      * Sends application message from this NC to another NC.
@@ -45,14 +61,14 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessageToNC(String nodeId, INcAddressedMessage message) throws Exception;
+    void sendMessageToNC(String nodeId, INcAddressedMessage message) throws Exception;
 
     /**
      * Queue a message to this {@link INCMessageBroker} for processing
      *
      * @param msg
      */
-    public void queueReceivedMessage(INcAddressedMessage msg);
+    void queueReceivedMessage(INcAddressedMessage msg);
 
     /**
      * Creates and registers a Future for a message that will be send through this broker
diff --git a/asterixdb/asterix-doc/src/main/markdown/builtins/14_window.md b/asterixdb/asterix-doc/src/main/markdown/builtins/14_window.md
index 67ead24..542abb4 100644
--- a/asterixdb/asterix-doc/src/main/markdown/builtins/14_window.md
+++ b/asterixdb/asterix-doc/src/main/markdown/builtins/14_window.md
@@ -64,52 +64,63 @@
 
 * Example:
 
-    For each author, find the cumulative distribution of all messages
-    in order of message ID.
+    For each customer, find the cumulative distribution of all orders
+    by order number.
 
-        SELECT m.messageId, m.authorId, CUME_DIST() OVER (
-          PARTITION BY m.authorId
-          ORDER BY m.messageId
+        FROM orders AS o
+        SELECT o.custid, o.orderno, CUME_DIST() OVER (
+          PARTITION BY o.custid
+          ORDER BY o.orderno
         ) AS `rank`
-        FROM GleambookMessages AS m;
+        ORDER BY o.custid, o.orderno;
 
 * The expected result is:
 
         [
           {
-            "rank": 0.2,
-            "messageId": 2,
-            "authorId": 1
-          },
-          {
-            "rank": 0.4,
-            "messageId": 4,
-            "authorId": 1
-          },
-          {
-            "rank": 0.6,
-            "messageId": 8,
-            "authorId": 1
-          },
-          {
-            "rank": 0.8,
-            "messageId": 10,
-            "authorId": 1
-          },
-          {
-            "rank": 1,
-            "messageId": 11,
-            "authorId": 1
+            "rank": 0.25,
+            "custid": "C13",
+            "orderno": 1002
           },
           {
             "rank": 0.5,
-            "messageId": 3,
-            "authorId": 2
+            "custid": "C13",
+            "orderno": 1007
+          },
+          {
+            "rank": 0.75,
+            "custid": "C13",
+            "orderno": 1008
           },
           {
             "rank": 1,
-            "messageId": 6,
-            "authorId": 2
+            "custid": "C13",
+            "orderno": 1009
+          },
+          {
+            "rank": 1,
+            "custid": "C31",
+            "orderno": 1003
+          },
+          {
+            "rank": 1,
+            "custid": "C35",
+            "orderno": 1004
+          },
+          {
+            "rank": 1,
+            "custid": "C37",
+            "orderno": 1005
+          },
+          {
+            "rank": 0.5,
+            "custid": "C41",
+            "orderno": 1001
+          },
+          {
+            "rank": 1,
+            "custid": "C41",
+            "orderno": 1006
           }
         ]
 
@@ -131,7 +142,7 @@
     For this function, when any tuples have the same rank, the rank of the next
     tuple will be consecutive, so there will not be a gap in the sequence of
     returned values.
-    For example, if there are three tuples ranked 2, the next dense rank is 3.
+    For example, if there are five tuples ranked 3, the next dense rank is 4.
 
 * Arguments:
 
@@ -149,59 +160,62 @@
 
 * Example:
 
-    For each author, find the dense rank of all messages in order of location.
+    Find the dense rank of all orders by number of items.
 
-        SELECT m.authorId, m.messageId, m.senderLocation[1] as longitude,
+        FROM orders AS o
+        SELECT o.orderno, LEN(o.items) AS items,
         DENSE_RANK() OVER (
-          PARTITION BY m.authorId
-          ORDER BY m.senderLocation[1]
+          ORDER BY LEN(o.items)
         ) AS `rank`
-        FROM GleambookMessages AS m;
+        ORDER BY `rank`, o.orderno;
 
 * The expected result is:
 
         [
           {
+            "items": 0,
             "rank": 1,
-            "authorId": 1,
-            "messageId": 10,
-            "longitude": 70.01
+            "orderno": 1009
           },
           {
+            "items": 1,
             "rank": 2,
-            "authorId": 1,
-            "messageId": 11,
-            "longitude": 77.49
+            "orderno": 1008
           },
           {
+            "items": 2,
             "rank": 3,
-            "authorId": 1,
-            "messageId": 2,
-            "longitude": 80.87
+            "orderno": 1001
           },
           {
+            "items": 2,
             "rank": 3,
-            "authorId": 1,
-            "messageId": 8,
-            "longitude": 80.87
+            "orderno": 1002
           },
           {
+            "items": 2,
+            "rank": 3,
+            "orderno": 1003
+          },
+          {
+            "items": 2,
+            "rank": 3,
+            "orderno": 1004
+          },
+          {
+            "items": 2,
+            "rank": 3,
+            "orderno": 1007
+          },
+          {
+            "items": 3,
             "rank": 4,
-            "authorId": 1,
-            "messageId": 4,
-            "longitude": 97.04
+            "orderno": 1006
           },
           {
-            "rank": 1,
-            "authorId": 2,
-            "messageId": 6,
-            "longitude": 75.56
-          },
-          {
-            "rank": 2,
-            "authorId": 2,
-            "messageId": 3,
-            "longitude": 81.01
+            "items": 4,
+            "rank": 5,
+            "orderno": 1005
           }
         ]
 
@@ -268,61 +282,76 @@
 
 * Example:
 
-    For each author, show the length of each message, including the
-    length of the shortest message from that author.
+    For each order, show the customer and the value, including the
+    value of the smallest order from that customer.
 
-        SELECT m.authorId, m.messageId,
-        LENGTH(m.message) AS message_length,
-        FIRST_VALUE(LENGTH(m.message)) OVER (
-          PARTITION BY m.authorId
-          ORDER BY LENGTH(m.message)
-        ) AS shortest_message
-        FROM GleambookMessages AS m;
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno, revenue,
+        FIRST_VALUE(revenue) OVER (
+          PARTITION BY o.custid
+          ORDER BY revenue
+        ) AS smallest_order;
 
 * The expected result is:
 
         [
           {
-            "message_length": 31,
-            "shortest_message": 31,
-            "authorId": 1,
-            "messageId": 8
+            "custid": "C13",
+            "orderno": 1009,
+            "revenue": null,
+            "smallest_order": null
           },
           {
-            "message_length": 39,
-            "shortest_message": 31,
-            "authorId": 1,
-            "messageId": 11
+            "custid": "C13",
+            "orderno": 1007,
+            "revenue": 130.45,
+            "smallest_order": null
           },
           {
-            "message_length": 44,
-            "shortest_message": 31,
-            "authorId": 1,
-            "messageId": 4
+            "custid": "C13",
+            "orderno": 1008,
+            "revenue": 1999.8,
+            "smallest_order": null
           },
           {
-            "message_length": 45,
-            "shortest_message": 31,
-            "authorId": 1,
-            "messageId": 2
+            "custid": "C13",
+            "orderno": 1002,
+            "revenue": 10906.55,
+            "smallest_order": null
           },
           {
-            "message_length": 51,
-            "shortest_message": 31,
-            "authorId": 1,
-            "messageId": 10
+            "custid": "C31",
+            "orderno": 1003,
+            "revenue": 477.95,
+            "smallest_order": 477.95
           },
           {
-            "message_length": 35,
-            "shortest_message": 35,
-            "authorId": 2,
-            "messageId": 3
+            "custid": "C35",
+            "orderno": 1004,
+            "revenue": 199.94,
+            "smallest_order": 199.94
           },
           {
-            "message_length": 44,
-            "shortest_message": 35,
-            "authorId": 2,
-            "messageId": 6
+            "custid": "C37",
+            "orderno": 1005,
+            "revenue": 4639.92,
+            "smallest_order": 4639.92
+          },
+          {
+            "custid": "C41",
+            "orderno": 1001,
+            "revenue": 157.73,
+            "smallest_order": 157.73
+          },
+          {
+            "custid": "C41",
+            "orderno": 1006,
+            "revenue": 18847.58,
+            "smallest_order": 157.73
           }
         ]
 
@@ -380,61 +409,76 @@
 
 * Example:
 
-    For each author, show the length of each message, including the
-    length of the next-shortest message.
+    For each order, show the customer and the value, including the
+    value of the next-smallest order from that customer.
 
-        SELECT m.authorId, m.messageId,
-        LENGTH(m.message) AS message_length,
-        LAG(LENGTH(m.message), 1, "No shorter message") OVER (
-          PARTITION BY m.authorId
-          ORDER BY LENGTH(m.message)
-        ) AS next_shortest_message
-        FROM GleambookMessages AS m;
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno, revenue,
+        LAG(revenue, 1, "No smaller order") OVER (
+          PARTITION BY o.custid
+          ORDER BY revenue
+        ) AS next_smallest_order;
 
 * The expected result is:
 
         [
           {
-            "message_length": 31,
-            "authorId": 1,
-            "messageId": 8,
-            "next_shortest_message": "No shorter message"
+            "custid": "C13",
+            "orderno": 1009,
+            "revenue": null,
+            "next_smallest_order": "No smaller order"
           },
           {
-            "message_length": 39,
-            "authorId": 1,
-            "messageId": 11,
-            "next_shortest_message": 31
+            "custid": "C13",
+            "orderno": 1007,
+            "revenue": 130.45,
+            "next_smallest_order": null
           },
           {
-            "message_length": 44,
-            "authorId": 1,
-            "messageId": 4,
-            "next_shortest_message": 39
+            "custid": "C13",
+            "orderno": 1008,
+            "revenue": 1999.8,
+            "next_smallest_order": 130.45
           },
           {
-            "message_length": 45,
-            "authorId": 1,
-            "messageId": 2,
-            "next_shortest_message": 44
+            "custid": "C13",
+            "orderno": 1002,
+            "revenue": 10906.55,
+            "next_smallest_order": 1999.8
           },
           {
-            "message_length": 51,
-            "authorId": 1,
-            "messageId": 10,
-            "next_shortest_message": 45
+            "custid": "C31",
+            "orderno": 1003,
+            "revenue": 477.95,
+            "next_smallest_order": "No smaller order"
           },
           {
-            "message_length": 35,
-            "authorId": 2,
-            "messageId": 3,
-            "next_shortest_message": "No shorter message"
+            "custid": "C35",
+            "orderno": 1004,
+            "revenue": 199.94,
+            "next_smallest_order": "No smaller order"
           },
           {
-            "message_length": 44,
-            "authorId": 2,
-            "messageId": 6,
-            "next_shortest_message": 35
+            "custid": "C37",
+            "orderno": 1005,
+            "revenue": 4639.92,
+            "next_smallest_order": "No smaller order"
+          },
+          {
+            "custid": "C41",
+            "orderno": 1001,
+            "revenue": 157.73,
+            "next_smallest_order": "No smaller order"
+          },
+          {
+            "custid": "C41",
+            "orderno": 1006,
+            "revenue": 18847.58,
+            "next_smallest_order": 157.73
           }
         ]
 
@@ -503,62 +547,77 @@
 
 * Example:
 
-    For each author, show the length of each message, including the
-    length of the longest message from that author.
+    For each order, show the customer and the value, including the
+    value of the largest order from that customer.
 
-        SELECT m.authorId, m.messageId,
-        LENGTH(m.message) AS message_length,
-        LAST_VALUE(LENGTH(m.message)) OVER (
-          PARTITION BY m.authorId
-          ORDER BY LENGTH(m.message)
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno, revenue,
+        LAST_VALUE(revenue) OVER (
+          PARTITION BY o.custid
+          ORDER BY revenue
           ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING -- ➊
-        ) AS longest_message
-        FROM GleambookMessages AS m;
+        ) AS largest_order;
 
 * The expected result is:
 
         [
           {
-            "message_length": 31,
-            "longest_message": 51,
-            "authorId": 1,
-            "messageId": 8
+            "custid": "C13",
+            "orderno": 1009,
+            "revenue": null,
+            "largest_order": 10906.55
           },
           {
-            "message_length": 39,
-            "longest_message": 51,
-            "authorId": 1,
-            "messageId": 11
+            "custid": "C13",
+            "orderno": 1007,
+            "revenue": 130.45,
+            "largest_order": 10906.55
           },
           {
-            "message_length": 44,
-            "longest_message": 51,
-            "authorId": 1,
-            "messageId": 4
+            "custid": "C13",
+            "orderno": 1008,
+            "revenue": 1999.8,
+            "largest_order": 10906.55
           },
           {
-            "message_length": 45,
-            "longest_message": 51,
-            "authorId": 1,
-            "messageId": 2
+            "custid": "C13",
+            "orderno": 1002,
+            "revenue": 10906.55,
+            "largest_order": 10906.55
           },
           {
-            "message_length": 51,
-            "longest_message": 51,
-            "authorId": 1,
-            "messageId": 10
+            "custid": "C31",
+            "orderno": 1003,
+            "revenue": 477.95,
+            "largest_order": 477.95
           },
           {
-            "message_length": 35,
-            "longest_message": 44,
-            "authorId": 2,
-            "messageId": 3
+            "custid": "C35",
+            "orderno": 1004,
+            "revenue": 199.94,
+            "largest_order": 199.94
           },
           {
-            "message_length": 44,
-            "longest_message": 44,
-            "authorId": 2,
-            "messageId": 6
+            "custid": "C37",
+            "orderno": 1005,
+            "revenue": 4639.92,
+            "largest_order": 4639.92
+          },
+          {
+            "custid": "C41",
+            "orderno": 1001,
+            "revenue": 157.73,
+            "largest_order": 18847.58
+          },
+          {
+            "custid": "C41",
+            "orderno": 1006,
+            "revenue": 18847.58,
+            "largest_order": 18847.58
           }
         ]
 
@@ -566,8 +625,8 @@
     end of the window partition.
     Without this clause, the end point of the window frame would always be the
     current tuple.
-    This would mean that the longest message would always be the same as the
-    current message.
+    This would mean that the largest order would always be the same as the
+    current order.
 
 ### lead ###
 
@@ -623,61 +682,76 @@
 
 * Example:
 
-    For each author, show the length of each message, including the
-    length of the next-longest message.
+    For each order, show the customer and the value, including the
+    value of the next-largest order from that customer.
 
-        SELECT m.authorId, m.messageId,
-        LENGTH(m.message) AS message_length,
-        LEAD(LENGTH(m.message), 1, "No longer message") OVER (
-          PARTITION BY m.authorId
-          ORDER BY LENGTH(m.message)
-        ) AS next_longest_message
-        FROM GleambookMessages AS m;
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno, revenue,
+        LEAD(revenue, 1, "No larger order") OVER (
+          PARTITION BY o.custid
+          ORDER BY revenue
+        ) AS next_largest_order;
 
 * The expected result is:
 
         [
           {
-            "message_length": 31,
-            "authorId": 1,
-            "messageId": 8,
-            "next_longest_message": 39
+            "custid": "C13",
+            "orderno": 1009,
+            "revenue": null,
+            "next_largest_order": 130.45
           },
           {
-            "message_length": 39,
-            "authorId": 1,
-            "messageId": 11,
-            "next_longest_message": 44
+            "custid": "C13",
+            "orderno": 1007,
+            "revenue": 130.45,
+            "next_largest_order": 1999.8
           },
           {
-            "message_length": 44,
-            "authorId": 1,
-            "messageId": 4,
-            "next_longest_message": 45
+            "custid": "C13",
+            "orderno": 1008,
+            "revenue": 1999.8,
+            "next_largest_order": 10906.55
           },
           {
-            "message_length": 45,
-            "authorId": 1,
-            "messageId": 2,
-            "next_longest_message": 51
+            "custid": "C13",
+            "orderno": 1002,
+            "revenue": 10906.55,
+            "next_largest_order": "No larger order"
           },
           {
-            "message_length": 51,
-            "authorId": 1,
-            "messageId": 10,
-            "next_longest_message": "No longer message"
+            "custid": "C31",
+            "orderno": 1003,
+            "revenue": 477.95,
+            "next_largest_order": "No larger order"
           },
           {
-            "message_length": 35,
-            "authorId": 2,
-            "messageId": 3,
-            "next_longest_message": 44
+            "custid": "C35",
+            "orderno": 1004,
+            "revenue": 199.94,
+            "next_largest_order": "No larger order"
           },
           {
-            "message_length": 44,
-            "authorId": 2,
-            "messageId": 6,
-            "next_longest_message": "No longer message"
+            "custid": "C37",
+            "orderno": 1005,
+            "revenue": 4639.92,
+            "next_largest_order": "No larger order"
+          },
+          {
+            "custid": "C41",
+            "orderno": 1001,
+            "revenue": 157.73,
+            "next_largest_order": 18847.58
+          },
+          {
+            "custid": "C41",
+            "orderno": 1006,
+            "revenue": 18847.58,
+            "next_largest_order": "No larger order"
           }
         ]
 
@@ -756,62 +830,77 @@
 
 * Example 1:
 
-    For each author, show the length of each message, including the
-    length of the second shortest message from that author.
+    For each order, show the customer and the value, including the
+    value of the second smallest order from that customer.
 
-        SELECT m.authorId, m.messageId,
-        LENGTH(m.message) AS message_length,
-        NTH_VALUE(LENGTH(m.message), 2) FROM FIRST OVER (
-          PARTITION BY m.authorId
-          ORDER BY LENGTH(m.message)
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno, revenue,
+        NTH_VALUE(revenue, 2) FROM FIRST OVER (
+          PARTITION BY o.custid
+          ORDER BY revenue
           ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING -- ➊
-        ) AS shortest_message_but_1
-        FROM GleambookMessages AS m;
+        ) AS smallest_order_but_1;
 
 * The expected result is:
 
         [
           {
-            "message_length": 31,
-            "shortest_message_but_1": 39,
-            "authorId": 1,
-            "messageId": 8
+            "custid": "C13",
+            "orderno": 1009,
+            "revenue": null,
+            "smallest_order_but_1": 130.45
           },
           {
-            "message_length": 39,
-            "shortest_message_but_1": 39,
-            "authorId": 1,
-            "messageId": 11 // ➋
+            "custid": "C13",
+            "orderno": 1007,
+            "revenue": 130.45, // ➋
+            "smallest_order_but_1": 130.45
           },
           {
-            "message_length": 44,
-            "shortest_message_but_1": 39,
-            "authorId": 1,
-            "messageId": 4
+            "custid": "C13",
+            "orderno": 1008,
+            "revenue": 1999.8,
+            "smallest_order_but_1": 130.45
           },
           {
-            "message_length": 45,
-            "shortest_message_but_1": 39,
-            "authorId": 1,
-            "messageId": 2
+            "custid": "C13",
+            "orderno": 1002,
+            "revenue": 10906.55,
+            "smallest_order_but_1": 130.45
           },
           {
-            "message_length": 51,
-            "shortest_message_but_1": 39,
-            "authorId": 1,
-            "messageId": 10
+            "custid": "C31",
+            "orderno": 1003,
+            "revenue": 477.95,
+            "smallest_order_but_1": null
           },
           {
-            "message_length": 35,
-            "shortest_message_but_1": 44,
-            "authorId": 2,
-            "messageId": 3
+            "custid": "C35",
+            "orderno": 1004,
+            "revenue": 199.94,
+            "smallest_order_but_1": null
           },
           {
-            "message_length": 44,
-            "shortest_message_but_1": 44,
-            "authorId": 2,
-            "messageId": 6 // ➋
+            "custid": "C37",
+            "orderno": 1005,
+            "revenue": 4639.92,
+            "smallest_order_but_1": null
+          },
+          {
+            "custid": "C41",
+            "orderno": 1001,
+            "revenue": 157.73,
+            "smallest_order_but_1": 18847.58
+          },
+          {
+            "custid": "C41",
+            "orderno": 1006,
+            "revenue": 18847.58, // ➋
+            "smallest_order_but_1": 18847.58
           }
         ]
 
@@ -819,69 +908,84 @@
     end of the window partition.
     Without this clause, the end point of the window frame would always be the
     current tuple.
-    This would mean that for the shortest message, the function
-    would be unable to find the route with the second shortest message.
+    This would mean that for the smallest order, the function
+    would be unable to find the route with the second smallest order.
 
-    ➁ The second shortest message from this author.
+    ➁ The second smallest order from this customer.
 
 * Example 2:
 
-    For each author, show the length of each message, including the
-    length of the second longest message from that author.
+    For each order, show the customer and the value, including the
+    value of the second largest order from that customer.
 
-        SELECT m.authorId, m.messageId,
-        LENGTH(m.message) AS message_length,
-        NTH_VALUE(LENGTH(m.message), 2) FROM LAST OVER (
-          PARTITION BY m.authorId
-          ORDER BY LENGTH(m.message)
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno, revenue,
+        NTH_VALUE(revenue, 2) FROM LAST OVER (
+          PARTITION BY o.custid
+          ORDER BY revenue
           ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING -- ➊
-        ) AS longest_message_but_1
-        FROM GleambookMessages AS m;
+        ) AS largest_order_but_1;
 
 * The expected result is:
 
         [
           {
-            "message_length": 31,
-            "longest_message_but_1": 45,
-            "authorId": 1,
-            "messageId": 8
+            "custid": "C13",
+            "orderno": 1002,
+            "revenue": 10906.55,
+            "largest_order_but_1": 1999.8
           },
           {
-            "message_length": 39,
-            "longest_message_but_1": 45,
-            "authorId": 1,
-            "messageId": 11
+            "custid": "C13",
+            "orderno": 1008,
+            "revenue": 1999.8, // ➋
+            "largest_order_but_1": 1999.8
           },
           {
-            "message_length": 44,
-            "longest_message_but_1": 45,
-            "authorId": 1,
-            "messageId": 4
+            "custid": "C13",
+            "orderno": 1007,
+            "revenue": 130.45,
+            "largest_order_but_1": 1999.8
           },
           {
-            "message_length": 45,
-            "longest_message_but_1": 45,
-            "authorId": 1,
-            "messageId": 2 // ➋
+            "custid": "C13",
+            "orderno": 1009,
+            "revenue": null,
+            "largest_order_but_1": 1999.8
           },
           {
-            "message_length": 51,
-            "longest_message_but_1": 45,
-            "authorId": 1,
-            "messageId": 10
+            "custid": "C31",
+            "orderno": 1003,
+            "revenue": 477.95,
+            "largest_order_but_1": null
           },
           {
-            "message_length": 35,
-            "longest_message_but_1": 35,
-            "authorId": 2,
-            "messageId": 3 // ➋
+            "custid": "C35",
+            "orderno": 1004,
+            "revenue": 199.94,
+            "largest_order_but_1": null
           },
           {
-            "message_length": 44,
-            "longest_message_but_1": 35,
-            "authorId": 2,
-            "messageId": 6
+            "custid": "C37",
+            "orderno": 1005,
+            "revenue": 4639.92,
+            "largest_order_but_1": null
+          },
+          {
+            "custid": "C41",
+            "orderno": 1006,
+            "revenue": 18847.58,
+            "largest_order_but_1": 157.73
+          },
+          {
+            "custid": "C41",
+            "orderno": 1001,
+            "revenue": 157.73, // ➋
+            "largest_order_but_1": 157.73
           }
         ]
 
@@ -889,10 +993,10 @@
     end of the window partition.
     Without this clause, the end point of the window frame would always be the
     current tuple.
-    This would mean the function would be unable to find the second longest
-    message for shorter messages.
+    This would mean the function would be unable to find the second largest
+    order for smaller orders.
 
-    ➁ The second longest message from this author.
+    ➁ The second largest order from this customer.
 
 ### ntile ###
 
@@ -932,51 +1036,65 @@
 
 * Example:
 
-    Allocate each message to one of three tiles by length and message ID.
+    Allocate each order to one of three tiles by value.
 
-        SELECT m.messageId, LENGTH(m.message) AS `length`,
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.orderno, revenue,
         NTILE(3) OVER (
-          ORDER BY LENGTH(m.message), m.messageId
-        ) AS `ntile`
-        FROM GleambookMessages AS m;
+          ORDER BY revenue
+        ) AS `ntile`;
 
 * The expected result is:
 
         [
           {
-            "length": 31,
             "ntile": 1,
-            "messageId": 8
+            "orderno": 1009,
+            "revenue": null
           },
           {
-            "length": 35,
             "ntile": 1,
-            "messageId": 3
+            "orderno": 1007,
+            "revenue": 130.45
           },
           {
-            "length": 39,
             "ntile": 1,
-            "messageId": 11
+            "orderno": 1001,
+            "revenue": 157.73
           },
           {
-            "length": 44,
             "ntile": 2,
-            "messageId": 4
+            "orderno": 1004,
+            "revenue": 199.94
           },
           {
-            "length": 44,
             "ntile": 2,
-            "messageId": 6
+            "orderno": 1003,
+            "revenue": 477.95
           },
           {
-            "length": 45,
-            "ntile": 3,
-            "messageId": 2
+            "ntile": 2,
+            "orderno": 1008,
+            "revenue": 1999.8
           },
           {
-            "length": 51,
             "ntile": 3,
-            "messageId": 10
+            "orderno": 1005,
+            "revenue": 4639.92
+          },
+          {
+            "ntile": 3,
+            "orderno": 1002,
+            "revenue": 10906.55
+          },
+          {
+            "ntile": 3,
+            "orderno": 1006,
+            "revenue": 18847.58
           }
         ]
 
@@ -1011,52 +1129,61 @@
 
 * Example:
 
-    For each author, find the percentile rank of all messages in order
-    of message ID.
+    For each customer, find the percentile rank of all orders by order number.
 
-        SELECT m.messageId, m.authorId, PERCENT_RANK() OVER (
-          PARTITION BY m.authorId
-          ORDER BY m.messageId
-        ) AS `rank`
-        FROM GleambookMessages AS m;
+        FROM orders AS o
+        SELECT o.custid, o.orderno, PERCENT_RANK() OVER (
+          PARTITION BY o.custid
+          ORDER BY o.orderno
+        ) AS `rank`;
 
 * The expected result is:
 
         [
           {
             "rank": 0,
-            "messageId": 2,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1002
           },
           {
-            "rank": 0.25,
-            "messageId": 4,
-            "authorId": 1
+            "rank": 0.3333333333333333,
+            "custid": "C13",
+            "orderno": 1007
           },
           {
-            "rank": 0.5,
-            "messageId": 8,
-            "authorId": 1
-          },
-          {
-            "rank": 0.75,
-            "messageId": 10,
-            "authorId": 1
+            "rank": 0.6666666666666666,
+            "custid": "C13",
+            "orderno": 1008
           },
           {
             "rank": 1,
-            "messageId": 11,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1009
           },
           {
             "rank": 0,
-            "messageId": 3,
-            "authorId": 2
+            "custid": "C31",
+            "orderno": 1003
+          },
+          {
+            "rank": 0,
+            "custid": "C35",
+            "orderno": 1004
+          },
+          {
+            "rank": 0,
+            "custid": "C37",
+            "orderno": 1005
+          },
+          {
+            "rank": 0,
+            "custid": "C41",
+            "orderno": 1001
           },
           {
             "rank": 1,
-            "messageId": 6,
-            "authorId": 2
+            "custid": "C41",
+            "orderno": 1006
           }
         ]
 
@@ -1077,7 +1204,7 @@
     When any tuples have the same rank, the rank of the next tuple will include
     all preceding tuples, so there may be a gap in the sequence of returned
     values.
-    For example, if there are three tuples ranked 2, the next rank is 5.
+    For example, if there are five tuples ranked 3, the next rank is 8.
 
     To avoid gaps in the returned values, use the DENSE_RANK() function instead.
 
@@ -1097,59 +1224,61 @@
 
 * Example:
 
-    For each author, find the rank of all messages in order of location.
+    Find the rank of all orders by number of items.
 
-        SELECT m.authorId, m.messageId, m.senderLocation[1] as longitude,
+        FROM orders AS o
+        SELECT o.orderno, LEN(o.items) AS items,
         RANK() OVER (
-          PARTITION BY m.authorId
-          ORDER BY m.senderLocation[1]
-        ) AS `rank`
-        FROM GleambookMessages AS m;
+          ORDER BY LEN(o.items)
+        ) AS `rank`;
 
 * The expected result is:
 
         [
           {
+            "items": 0,
             "rank": 1,
-            "authorId": 1,
-            "messageId": 10,
-            "longitude": 70.01
+            "orderno": 1009
           },
           {
+            "items": 1,
             "rank": 2,
-            "authorId": 1,
-            "messageId": 11,
-            "longitude": 77.49
+            "orderno": 1008
           },
           {
+            "items": 2,
             "rank": 3,
-            "authorId": 1,
-            "messageId": 2,
-            "longitude": 80.87
+            "orderno": 1004
           },
           {
+            "items": 2,
             "rank": 3,
-            "authorId": 1,
-            "messageId": 8,
-            "longitude": 80.87
+            "orderno": 1007
           },
           {
-            "rank": 5,
-            "authorId": 1,
-            "messageId": 4,
-            "longitude": 97.04
+            "items": 2,
+            "rank": 3,
+            "orderno": 1002
           },
           {
-            "rank": 1,
-            "authorId": 2,
-            "messageId": 6,
-            "longitude": 75.56
+            "items": 2,
+            "rank": 3,
+            "orderno": 1001
           },
           {
-            "rank": 2,
-            "authorId": 2,
-            "messageId": 3,
-            "longitude": 81.01
+            "items": 2,
+            "rank": 3,
+            "orderno": 1003
+          },
+          {
+            "items": 3,
+            "rank": 8,
+            "orderno": 1006
+          },
+          {
+            "items": 4,
+            "rank": 9,
+            "orderno": 1005
           }
         ]
 
@@ -1187,52 +1316,66 @@
 
 * Example:
 
-    For each author, calculate the length of each message as a
-    fraction of the total length of all messages.
+    For each customer, calculate the value of each order as a
+    fraction of the total value of all orders.
 
-        SELECT m.messageId, m.authorId,
-        RATIO_TO_REPORT(LENGTH(m.message)) OVER (
-          PARTITION BY m.authorId
-        ) AS length_ratio
-        FROM GleambookMessages AS m;
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno,
+        RATIO_TO_REPORT(revenue) OVER (
+          PARTITION BY o.custid
+        ) AS fractional_ratio;
 
 * The expected result is:
 
         [
           {
-            "length_ratio": 0.21428571428571427,
-            "messageId": 2,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1007,
+            "fractional_ratio": 0.010006289887088855
           },
           {
-            "length_ratio": 0.20952380952380953,
-            "messageId": 4,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1002,
+            "fractional_ratio": 0.8365971710849288
           },
           {
-            "length_ratio": 0.14761904761904762,
-            "messageId": 8,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1009,
+            "fractional_ratio": null
           },
           {
-            "length_ratio": 0.24285714285714285,
-            "messageId": 10,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1008,
+            "fractional_ratio": 0.15339653902798234
           },
           {
-            "length_ratio": 0.18571428571428572,
-            "messageId": 11,
-            "authorId": 1
+            "custid": "C31",
+            "orderno": 1003,
+            "fractional_ratio": 1
           },
           {
-            "length_ratio": 0.4430379746835443,
-            "messageId": 3,
-            "authorId": 2
+            "custid": "C35",
+            "orderno": 1004,
+            "fractional_ratio": 1
           },
           {
-            "length_ratio": 0.5569620253164557,
-            "messageId": 6,
-            "authorId": 2
+            "custid": "C37",
+            "orderno": 1005,
+            "fractional_ratio": 1
+          },
+          {
+            "custid": "C41",
+            "orderno": 1006,
+            "fractional_ratio": 0.9917007404772666
+          },
+          {
+            "custid": "C41",
+            "orderno": 1001,
+            "fractional_ratio": 0.008299259522733382
           }
         ]
 
@@ -1265,52 +1408,66 @@
 
 * Example:
 
-    For each author, number all messages in order of length.
+    For each customer, number all orders by value.
 
-        SELECT m.messageId, m.authorId,
+        FROM orders AS o
+        LET revenue = ROUND((
+          FROM o.items
+          SELECT VALUE SUM(qty * price)
+        )[0], 2)
+        SELECT o.custid, o.orderno,
         ROW_NUMBER() OVER (
-          PARTITION BY m.authorId
-          ORDER BY LENGTH(m.message)
-        ) AS `row`
-        FROM GleambookMessages AS m;
+          PARTITION BY o.custid
+          ORDER BY revenue
+        ) AS `row`;
 
 * The expected result is:
 
         [
           {
             "row": 1,
-            "messageId": 8,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1009
           },
           {
             "row": 2,
-            "messageId": 11,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1007
           },
           {
             "row": 3,
-            "messageId": 4,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1008
           },
           {
             "row": 4,
-            "messageId": 2,
-            "authorId": 1
-          },
-          {
-            "row": 5,
-            "messageId": 10,
-            "authorId": 1
+            "custid": "C13",
+            "orderno": 1002
           },
           {
             "row": 1,
-            "messageId": 3,
-            "authorId": 2
+            "custid": "C31",
+            "orderno": 1003
+          },
+          {
+            "row": 1,
+            "custid": "C35",
+            "orderno": 1004
+          },
+          {
+            "row": 1,
+            "custid": "C37",
+            "orderno": 1005
+          },
+          {
+            "row": 1,
+            "custid": "C41",
+            "orderno": 1001
           },
           {
             "row": 2,
-            "messageId": 6,
-            "authorId": 2
+            "custid": "C41",
+            "orderno": 1006
           }
         ]
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 6198acc..b64f779 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -51,7 +51,7 @@
                     response.setException(new Exception("One or more nodes has not reported max resource id."));
                 }
             }
-            broker.sendApplicationMessageToNC(response, src);
+            broker.sendRealTimeApplicationMessageToNC(response, src);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
index 4e1c3b1..2d9acf4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
@@ -51,7 +51,7 @@
             ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
             long startingId = appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
             TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, blockSizeRequested);
-            broker.sendApplicationMessageToNC(response, nodeId);
+            broker.sendRealTimeApplicationMessageToNC(response, nodeId);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 42a0d66..a754baf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -66,6 +66,8 @@
 
     void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
+    void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+
     void takeThreadDump(String requestId) throws Exception;
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 0b85c4e..ff5dd33 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -134,6 +134,13 @@
     }
 
     @Override
+    public void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId)
+            throws Exception {
+        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void takeThreadDump(String requestId) throws Exception {
         ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId);
         ipcHandle.send(-1, fn, null);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index e4c1d30..08a18f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -61,8 +61,13 @@
         switch (fn.getFunctionId()) {
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
-                ncs.getWorkQueue().schedule(
-                        new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId()));
+                ApplicationMessageWork amfw =
+                        new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId());
+                if (amf.isRealTime()) {
+                    ncs.getExecutor().submit(amfw);
+                } else {
+                    ncs.getWorkQueue().schedule(amfw);
+                }
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;