fixed log messages
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index b197765..6b79958 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -539,7 +539,10 @@
int nodegroupCardinality = -1;
String nodegroupName;
String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
- if (hintValue != null) {
+ if (hintValue == null) {
+ nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+ return nodegroupName;
+ } else {
int numChosen = 0;
boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
@@ -578,10 +581,9 @@
}
nodegroupName = dataverse + ":" + dd.getName().getValue();
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
- } else {
- nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+ return nodegroupName;
}
- return nodegroupName;
+
}
private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index 7e33c9a..d7bb7f5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -1,7 +1,6 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.io.IOException;
-import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index f36639f..ab8999a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -102,9 +101,6 @@
adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
writer.fail();
FeedManager.INSTANCE.deregisterFeed(feedId);
- /*
- * Do not de-register feed
- */
} else {
FeedManager.INSTANCE.deregisterFeed(feedId);
throw new HyracksDataException(ie);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 477aa30..bd7e0f6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -34,6 +34,10 @@
private Map<FeedConnectionId, FeedRuntimeManager> feedRuntimeManagers = new HashMap<FeedConnectionId, FeedRuntimeManager>();
+ public FeedRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId) {
+ return feedRuntimeManagers.get(feedId);
+ }
+
public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
FeedRuntimeManager mgr = feedRuntimeManagers.get(feedId);
return mgr == null ? null : mgr.getExecutorService();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
index 12b22a9..3bcae80 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
@@ -6,9 +6,12 @@
import java.net.UnknownHostException;
import java.nio.CharBuffer;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
public class FeedMessageService {
+ private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
private static final char EOL = (char) "\n".getBytes()[0];
private final FeedConnectionId feedId;
@@ -54,13 +57,21 @@
}
} catch (Exception e) {
e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
+ }
} finally {
- System.out.println("STOPPING MESSAGE HANDLER");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Stopping feed message handler");
+ }
if (sfmSocket != null) {
try {
sfmSocket.close();
} catch (Exception e) {
e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in closing socket " + e.getMessage());
+ }
}
}
}
@@ -83,13 +94,12 @@
}
}
SuperFeedManager sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
-
- System.out.println(" OBTAINED SFM DETAILS WILL TRY TO CONNECT " + sfm);
-
try {
- sfmDirServiceSocket = new Socket(sfm.getHost(), sfm.getPort());
- System.out.println(" CONNECTED TO " + sfm.getHost() + " " + sfm.getPort());
-
+ FeedRuntimeManager runtimeManager = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
+ sfmDirServiceSocket = runtimeManager.createClientSocket(sfm.getHost(), sfm.getPort());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Connected to Super Feed Manager service " + sfm.getHost() + " " + sfm.getPort());
+ }
while (!sfmDirServiceSocket.isConnected()) {
Thread.sleep(2000);
}
@@ -103,9 +113,11 @@
buffer.flip();
String s = new String(buffer.array());
int port = Integer.parseInt(s.trim());
- System.out.println("OBTAINED PORT " + port + " WILL CONNECT AT " + sfm.getHost() + " " + port);
-
- sfmSocket = new Socket(sfm.getHost(), port);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Response from Super Feed Manager service " + port + " will connect at "
+ + sfm.getHost() + " " + port);
+ }
+ sfmSocket = runtimeManager.createClientSocket(sfm.getHost(), port);
} catch (Exception e) {
System.out.println(" COULT NOT CONNECT TO " + sfm);
e.printStackTrace();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 95b0da8..201ecf4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -133,7 +133,8 @@
public SuperFeedManagerService(int port, LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
throws IOException {
- server = new ServerSocket(port);
+ FeedRuntimeManager runtimeManager = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
+ server = runtimeManager.createServerSocket(port);
nextPort = port;
this.inbox = inbox;
this.feedId = feedId;