Flush Socket Feed Before Waiting for More Data

Change-Id: Ic37085114592e97b6c5b2d29c5451d816a154aa8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/709
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
index cf8d339..67c4493 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStream.java
@@ -33,6 +33,7 @@
     private ServerSocket server;
     private Socket socket;
     private InputStream connectionStream;
+    private AbstractFeedDataFlowController controller;
 
     public SocketInputStream(ServerSocket server) throws IOException {
         this.server = server;
@@ -73,6 +74,9 @@
         }
         int read = -1;
         try {
+            if (connectionStream.available() < 1) {
+                controller.flush();
+            }
             read = connectionStream.read(b, off, len);
         } catch (IOException e) {
             e.printStackTrace();
@@ -166,5 +170,6 @@
 
     @Override
     public void setController(AbstractFeedDataFlowController controller) {
+        this.controller = controller;
     }
 }