checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index d33a499..91b4abb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -86,9 +86,7 @@
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -1384,7 +1382,7 @@
FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, bfs
.getDatasetName().getValue());
boolean isFeedActive = FeedOperations.isFeedActive(recentActivity);
- if (isFeedActive) {
+ if (isFeedActive && !bfs.isForceBegin()) {
throw new AsterixException("Feed " + bfs.getDatasetName().getValue()
+ " is currently ACTIVE. Operation not supported");
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index bbba414..a379a24 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -103,22 +103,22 @@
IOperatorDescriptor feedMessenger;
AlgebricksPartitionConstraint messengerPc;
- List<IFeedMessage> feedMessages = new ArrayList<IFeedMessage>();
+ IFeedMessage feedMessage = null;
switch (controlFeedStatement.getOperationType()) {
case END:
- feedMessages.add(new FeedMessage(MessageType.STOP));
+ feedMessage = new FeedMessage(MessageType.END);
break;
case ALTER:
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(controlFeedStatement.getProperties());
- feedMessages.add(new AlterFeedMessage(wrappedProperties));
+ feedMessage = new AlterFeedMessage(wrappedProperties);
break;
}
try {
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider.buildFeedMessengerRuntime(
metadataProvider, spec, (FeedDatasetDetails) dataset.getDatasetDetails(), dataverseName,
- datasetName, feedMessages, feedActivity);
+ datasetName, feedMessage, feedActivity);
feedMessenger = p.first;
messengerPc = p.second;
} catch (AlgebricksException e) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 0e22cc0..6461b0c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -82,7 +82,6 @@
if (!work.isEmpty()) {
executeWorkSet(work);
}
-
}
@@ -187,17 +186,28 @@
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Unable to add NC: no more available nodes");
}
+
}
}
for (AddNodeWork w : nodeAdditionRequests) {
int n = w.getNumberOfNodes();
List<String> nodesToBeAddedForWork = new ArrayList<String>();
- for (int i = 0; i < n; i++) {
+ for (int i = 0; i < n && i < addedNodes.size(); i++) {
nodesToBeAddedForWork.add(addedNodes.get(i));
}
- AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
- pendingWorkResponses.add(response);
+ if (nodesToBeAddedForWork.isEmpty()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Unable to satisfy request by " + w);
+ }
+ AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
+ response.setStatus(Status.FAILURE);
+ w.getSourceSubscriber().notifyRequestCompletion(response);
+
+ } else {
+ AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
+ pendingWorkResponses.add(response);
+ }
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index dc7dec6..15e6cd5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -52,6 +52,7 @@
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
@@ -74,6 +75,7 @@
import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
public class FeedLifecycleListener implements IJobLifecycleListener, IClusterEventsSubscriber, Serializable {
@@ -85,6 +87,7 @@
private LinkedBlockingQueue<Message> jobEventInbox;
private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+
private State state;
private FeedLifecycleListener() {
@@ -223,6 +226,7 @@
List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
+ List<OperatorDescriptorId> storageOperatorIds = new ArrayList<OperatorDescriptorId>();
Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
@@ -236,6 +240,8 @@
computeOperatorIds.add(entry.getKey());
}
}
+ } else if (entry.getValue() instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+ storageOperatorIds.add(entry.getKey());
}
}
@@ -257,6 +263,10 @@
feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
}
}
+ StringBuilder storageLocs = new StringBuilder();
+ for (OperatorDescriptorId storageOpId : storageOperatorIds) {
+ feedInfo.storageLocations.addAll(info.getOperatorLocations().get(storageOpId));
+ }
for (String ingestLoc : feedInfo.ingestLocations) {
ingestLocs.append(ingestLoc);
@@ -266,9 +276,14 @@
computeLocs.append(computeLoc);
computeLocs.append(",");
}
+ for (String storageLoc : feedInfo.storageLocations) {
+ storageLocs.append(storageLoc);
+ storageLocs.append(",");
+ }
feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS, storageLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, feedInfo.feedPolicy);
FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
@@ -336,6 +351,7 @@
public JobSpecification jobSpec;
public List<String> ingestLocations = new ArrayList<String>();
public List<String> computeLocations = new ArrayList<String>();
+ public List<String> storageLocations = new ArrayList<String>();
public JobInfo jobInfo;
public String feedPolicy;
@@ -376,6 +392,7 @@
}
private Set<IClusterManagementWork> handleFailure(FeedFailureReport failureReport) {
+ reportFeedFailure(failureReport);
Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
@@ -397,8 +414,13 @@
failuresBecauseOfThisNode.put(feedInfo, feedF);
}
feedF.add(feedFailure.failureType);
+
break;
case STORAGE_NODE:
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unrecoverable situation! lost storage node for the feed " + feedInfo.feedId);
+ }
+ break;
}
}
}
@@ -409,6 +431,38 @@
return work;
}
+ private void reportFeedFailure(FeedFailureReport failureReport) {
+ MetadataTransactionContext ctx = null;
+ FeedActivity fa = null;
+ Map<String, String> feedActivityDetails = new HashMap<String, String>();
+ StringBuilder builder = new StringBuilder();
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ for (Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
+ FeedInfo feedInfo = entry.getKey();
+ List<FeedFailure> feedFailures = entry.getValue();
+ for (FeedFailure failure : feedFailures) {
+ builder.append(failure + ",");
+ }
+ builder.deleteCharAt(builder.length() - 1);
+ feedActivityDetails.put(FeedActivityDetails.FEED_NODE_FAILURE, builder.toString());
+ fa = new FeedActivity(feedInfo.feedId.getDataverse(), feedInfo.feedId.getDataset(),
+ FeedActivityType.FEED_FAILURE, feedActivityDetails);
+ MetadataManager.INSTANCE.registerFeedActivity(ctx, feedInfo.feedId, fa);
+ }
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (ctx != null) {
+ try {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } catch (Exception e2) {
+ e2.addSuppressed(e);
+ throw new IllegalStateException("Unable to abort transaction " + e2);
+ }
+ }
+ }
+ }
+
public static class FeedFailure {
public enum FailureType {
@@ -424,6 +478,11 @@
this.failureType = failureType;
this.nodeId = nodeId;
}
+
+ @Override
+ public String toString() {
+ return failureType + " (" + nodeId + ") ";
+ }
}
@Override
@@ -524,6 +583,7 @@
DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(
datasetName), feedPolicy, 0);
+ stmt.setForceBegin(true);
List<Statement> statements = new ArrayList<Statement>();
statements.add(dataverseDecl);
statements.add(stmt);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index 8a889c9..6eaf603 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -18,6 +18,7 @@
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
@@ -25,6 +26,7 @@
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -58,26 +60,32 @@
AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
switch (resp.getStatus()) {
case FAILURE:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Request " + resp.getWork() + " not completed");
+ }
break;
case SUCCESS:
- AddNodeWork work = (AddNodeWork) submittedWork;
- FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
- Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
- for (FeedInfo feedInfo : affectedFeeds) {
- try {
- recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Recovered feed:" + feedInfo);
- }
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Unable to recover feed:" + feedInfo);
- }
- }
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Request " + resp.getWork() + " completed");
}
break;
}
- resp.getNodesAdded();
+
+ AddNodeWork work = (AddNodeWork) submittedWork;
+ FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
+ Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
+ for (FeedInfo feedInfo : affectedFeeds) {
+ try {
+ recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Recovered feed:" + feedInfo);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to recover feed:" + feedInfo);
+ }
+ }
+ }
break;
case REMOVE_NODE:
break;
@@ -95,15 +103,46 @@
}
}
JobSpecification spec = feedInfo.jobSpec;
- //AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+ System.out.println("ALTERED Job Spec" + spec);
+ Thread.sleep(3000);
+ AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
}
private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
- Random r = new Random();
- String[] rnodes = resp.getNodesAdded().toArray(new String[] {});
- String replacementNode = rnodes[r.nextInt(rnodes.length)];
- Map<OperatorDescriptorId, IOperatorDescriptor> opMap = feedInfo.jobSpec.getOperatorMap();
- Set<Constraint> userConstraints = feedInfo.jobSpec.getUserConstraints();
+ String replacementNode = null;
+ switch (resp.getStatus()) {
+ case FAILURE:
+ boolean computeNodeSubstitute = (feedInfo.computeLocations.contains(failedNodeId) && feedInfo.computeLocations
+ .size() > 1);
+ if (computeNodeSubstitute) {
+ feedInfo.computeLocations.remove(failedNodeId);
+ replacementNode = feedInfo.computeLocations.get(0);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Compute node:" + replacementNode + " chosen to replace " + failedNodeId);
+ }
+ } else {
+ replacementNode = feedInfo.storageLocations.get(0);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Storage node:" + replacementNode + " chosen to replace " + failedNodeId);
+ }
+ }
+ break;
+ case SUCCESS:
+ Random r = new Random();
+ String[] rnodes = resp.getNodesAdded().toArray(new String[] {});
+ replacementNode = rnodes[r.nextInt(rnodes.length)];
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Newly added node:" + replacementNode + " chosen to replace " + failedNodeId);
+ }
+
+ break;
+ }
+ replaceNode(feedInfo.jobSpec, failedNodeId, replacementNode);
+ }
+
+ private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
+ Map<OperatorDescriptorId, IOperatorDescriptor> opMap = jobSpec.getOperatorMap();
+ Set<Constraint> userConstraints = jobSpec.getUserConstraints();
List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
@@ -161,22 +200,29 @@
}
}
- feedInfo.jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
- feedInfo.jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+ jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+ jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
for (OperatorDescriptorId mopId : modifiedOperators) {
List<Constraint> clist = candidateConstraints.get(mopId);
if (clist != null && !clist.isEmpty()) {
- feedInfo.jobSpec.getUserConstraints().removeAll(clist);
+ jobSpec.getUserConstraints().removeAll(clist);
+
+ for (Constraint c : clist) {
+ if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
+ ConstraintExpression cexpr = c.getRValue();
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ newConstraints.get(mopId).add(oldLocation);
+ }
+ }
}
}
for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
OperatorDescriptorId nopId = entry.getKey();
List<String> clist = entry.getValue();
- IOperatorDescriptor op = feedInfo.jobSpec.getOperatorMap().get(nopId);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(feedInfo.jobSpec, op,
- clist.toArray(new String[] {}));
+ IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, clist.toArray(new String[] {}));
}
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
index d443165..8262b41 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
@@ -40,6 +40,7 @@
private final String policy;
private Query query;
private int varCounter;
+ private boolean forceBegin = false;
public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
@@ -128,4 +129,12 @@
visitor.visit(this, arg);
}
+ public boolean isForceBegin() {
+ return forceBegin;
+ }
+
+ public void setForceBegin(boolean forceBegin) {
+ this.forceBegin = forceBegin;
+ }
+
}
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 0b1a7c1..4fdae83 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -533,7 +533,8 @@
}
// TODO use fctName.library
- return new FunctionSignature(fctName.dataverse, fctName.function, arity);
+ String fqFunctionName = fctName.library == null ? fctName.function : fctName.library + "#" + fctName.function;
+ return new FunctionSignature(fctName.dataverse, fqFunctionName, arity);
}
}
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 5192e06..55e6664 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -22,6 +22,8 @@
<xs:element name="client_port" type="xs:integer" />
<xs:element name="cluster_port" type="xs:integer" />
<xs:element name="http_port" type="xs:integer" />
+ <xs:element name="debug_port" type="xs:integer" />
+
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -44,6 +46,7 @@
<xs:element ref="cl:client_port" />
<xs:element ref="cl:cluster_port" />
<xs:element ref="cl:http_port" />
+ <xs:element ref="cl:debug_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
@@ -75,6 +78,7 @@
<xs:element ref="cl:txn_log_dir" minOccurs="0" />
<xs:element ref="cl:store" minOccurs="0" />
<xs:element ref="cl:iodevices" minOccurs="0" />
+ <xs:element ref="cl:debug_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
index c503911..2317cfa 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
public class EventDriver {
public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
private static String eventsDir;
private static Events events;
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index fd877e6..82b6e9e 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -63,6 +63,13 @@
if (javaOpts != null) {
builder.append(javaOpts);
}
+ if (node.getDebugPort() != null) {
+ int debugPort = node.getDebugPort().intValue();
+ if (!javaOpts.contains("-Xdebug")) {
+ builder.append((" "
+ + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=" + debugPort));
+ }
+ }
builder.append("\"");
envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
}
@@ -74,6 +81,13 @@
if (javaOpts != null) {
builder.append(javaOpts);
}
+ if (node.getDebugPort() != null) {
+ int debugPort = node.getDebugPort().intValue();
+ if (!javaOpts.contains("-Xdebug")) {
+ builder.append((" "
+ + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=" + debugPort));
+ }
+ }
builder.append("\"");
envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
index 02ddb28..49b7abf 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
@@ -35,13 +35,12 @@
public class EventUtil {
- public static final String EVENTS_DIR = "events";
- public static final String CLUSTER_CONF = "config/cluster.xml";
- public static final String PATTERN_CONF = "config/pattern.xml";
- public static final DateFormat dateFormat = new SimpleDateFormat(
- "yyyy/MM/dd HH:mm:ss");
- public static final String NC_JAVA_OPTS = "nc.java.opts";
- public static final String CC_JAVA_OPTS = "cc.java.opts";
+ public static final String EVENTS_DIR = "events";
+ public static final String CLUSTER_CONF = "config/cluster.xml";
+ public static final String PATTERN_CONF = "config/pattern.xml";
+ public static final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ public static final String NC_JAVA_OPTS = "nc.java.opts";
+ public static final String CC_JAVA_OPTS = "cc.java.opts";
private static final String IP_LOCATION = "IP_LOCATION";
private static final String CLUSTER_ENV = "ENV";
@@ -51,22 +50,21 @@
private static final String LOCALHOST = "localhost";
private static final String LOCALHOST_IP = "127.0.0.1";
- public static Cluster getCluster(String clusterConfigurationPath)
- throws JAXBException {
- File file = new File(clusterConfigurationPath);
- JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
- Unmarshaller unmarshaller = ctx.createUnmarshaller();
- Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
- if (cluster.getMasterNode().getClusterIp().equals(LOCALHOST)) {
- cluster.getMasterNode().setClusterIp(LOCALHOST_IP);
- }
- for (Node node : cluster.getNode()) {
- if (node.getClusterIp().equals(LOCALHOST)) {
- node.setClusterIp(LOCALHOST_IP);
- }
- }
- return cluster;
- }
+ public static Cluster getCluster(String clusterConfigurationPath) throws JAXBException {
+ File file = new File(clusterConfigurationPath);
+ JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unmarshaller = ctx.createUnmarshaller();
+ Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
+ if (cluster.getMasterNode().getClusterIp().equals(LOCALHOST)) {
+ cluster.getMasterNode().setClusterIp(LOCALHOST_IP);
+ }
+ for (Node node : cluster.getNode()) {
+ if (node.getClusterIp().equals(LOCALHOST)) {
+ node.setClusterIp(LOCALHOST_IP);
+ }
+ }
+ return cluster;
+ }
public static long parseTimeInterval(ValueType v, String unit) throws IllegalArgumentException {
int val = 0;
@@ -193,17 +191,14 @@
return EventDriver.CLIENT_NODE;
}
- if (nodeid.equals(cluster.getMasterNode().getId())) {
- String logDir = cluster.getMasterNode().getLogDir() == null ? cluster
- .getLogDir()
- : cluster.getMasterNode().getLogDir();
- String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster
- .getJavaHome()
- : cluster.getMasterNode().getJavaHome();
- return new Node(cluster.getMasterNode().getId(), cluster
- .getMasterNode().getClusterIp(), javaHome, logDir, null,
- null, null);
- }
+ if (nodeid.equals(cluster.getMasterNode().getId())) {
+ String logDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
+ .getLogDir();
+ String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
+ .getMasterNode().getJavaHome();
+ return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
+ null, null, null, cluster.getMasterNode().getDebugPort());
+ }
List<Node> nodeList = cluster.getNode();
for (Node node : nodeList) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 1854245..d50ea72 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -40,7 +40,7 @@
* A factory class for creating an instance of HDFSAdapter
*/
@SuppressWarnings("deprecation")
-public class HDFSAdapterFactory extends FileSystemAdapterFactory {
+public class HDFSAdapterFactory extends StreamBasedAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 16e6ef7..b29e150 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -38,7 +38,7 @@
* A factory class for creating an instance of HiveAdapter
*/
@SuppressWarnings("deprecation")
-public class HiveAdapterFactory extends FileSystemAdapterFactory {
+public class HiveAdapterFactory extends StreamBasedAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 3d8bedf..5f23f96 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -32,7 +32,7 @@
* NCFileSystemAdapter reads external data residing on the local file system of
* an NC.
*/
-public class NCFileSystemAdapterFactory extends FileSystemAdapterFactory {
+public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
similarity index 80%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java
rename to asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 0fa428d..1287de4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -9,6 +9,7 @@
import edu.uci.ics.asterix.external.util.DNSResolverFactory;
import edu.uci.ics.asterix.external.util.INodeResolver;
import edu.uci.ics.asterix.external.util.INodeResolverFactory;
+import edu.uci.ics.asterix.metadata.feeds.ConditionalPushTupleParserFactory;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
@@ -25,7 +26,10 @@
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-public abstract class FileSystemAdapterFactory implements IAdapterFactory {
+public abstract class StreamBasedAdapterFactory implements IAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(StreamBasedAdapterFactory.class.getName());
protected Map<String, Object> configuration;
protected static INodeResolver nodeResolver;
@@ -36,13 +40,11 @@
public static final String KEY_DELIMITER = "delimiter";
public static final String KEY_PATH = "path";
public static final String KEY_SOURCE_DATATYPE = "source-datatype";
-
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_ADM = "adm";
-
public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-
- private static final Logger LOGGER = Logger.getLogger(FileSystemAdapterFactory.class.getName());
+ public static final String BATCH_SIZE = "batch-size";
+ public static final String BATCH_INTERVAL = "batch-interval";
protected ITupleParserFactory parserFactory;
protected ITupleParser parser;
@@ -56,7 +58,8 @@
typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
}
- protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType) throws AsterixException {
+ protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
+ throws AsterixException {
int n = recordType.getFieldTypes().length;
IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
for (int i = 0; i < n; i++) {
@@ -73,12 +76,16 @@
}
Character delimiter = delimiterValue.charAt(0);
- return new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
+
+ return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, fieldParserFactories, delimiter,
+ configuration) : new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
}
- protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType) throws AsterixException {
+ protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
+ throws AsterixException {
try {
- return new AdmSchemafullRecordParserFactory(recordType);
+ return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, configuration)
+ : new AdmSchemafullRecordParserFactory(recordType);
} catch (Exception e) {
throw new AsterixException(e);
}
@@ -86,15 +93,21 @@
}
protected void configureFormat(IAType sourceDatatype) throws Exception {
+ String propValue = (String) configuration.get(BATCH_SIZE);
+ int batchSize = propValue != null ? Integer.parseInt(propValue) : -1;
+ propValue = (String) configuration.get(BATCH_INTERVAL);
+ long batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
+ boolean conditionalPush = batchSize > 0 || batchInterval > 0;
+
String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
if (parserFactoryClassname == null) {
String specifiedFormat = (String) configuration.get(KEY_FORMAT);
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
} else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
- parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype);
+ parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
} else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
- parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype);
+ parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
} else {
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index e3deba9..e162ffe 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -150,4 +150,8 @@
return configuration;
}
+ public void setWriter(IFrameWriter writer) {
+
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index a11197b..8efe919 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -99,7 +99,6 @@
}
- @SuppressWarnings("unchecked")
private void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
throws IOException, AsterixException {
ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index e17db67..be4debe 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.InputStream;
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -21,6 +22,7 @@
protected final ITupleParser tupleParser;
protected final IAType sourceDatatype;
protected IHyracksTaskContext ctx;
+ protected AdapterRuntimeManager runtimeManager;
public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx) {
this.tupleParser = parserFactory.createTupleParser(ctx);
@@ -33,4 +35,5 @@
InputStream in = getInputStream(partition);
tupleParser.parse(in, writer);
}
+
}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java
new file mode 100644
index 0000000..d15d661
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class EchoDelayFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new EchoDelayFunction();
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java
new file mode 100644
index 0000000..5f9be77
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import java.util.Random;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+
+public class EchoDelayFunction implements IExternalScalarFunction {
+
+ private Random rand = new Random();
+ private long sleepIntervalMin;
+ private long sleepIntervalMax;
+ private int range;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ sleepIntervalMin = 50;
+ sleepIntervalMax = 100;
+ range = (new Long(sleepIntervalMax - sleepIntervalMin)).intValue();
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ long sleepInterval = rand.nextInt(range);
+ // Thread.sleep(5);
+ functionHelper.setResult(inputRecord);
+ }
+}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
index bee6604..6d00029 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/text_functions.xml
@@ -41,5 +41,13 @@
<definition>edu.uci.ics.asterix.external.library.AllTypesFactory
</definition>
</function>
+ <function>
+ <function_type>SCALAR</function_type>
+ <name>echoDelay</name>
+ <arguments>TweetMessageType</arguments>
+ <return_type>TweetMessageType</return_type>
+ <definition>edu.uci.ics.asterix.external.library.EchoDelayFactory
+ </definition>
+ </function>
</functions>
</library>
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java
index c5cbe7bf..5035028 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java
@@ -102,7 +102,7 @@
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null);
+ masterNode.getLogDir(), null, null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 50fb050..eac653b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -317,7 +317,10 @@
"edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory",
"edu.uci.ics.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory",
"edu.uci.ics.asterix.external.adapter.factory.RSSFeedAdapterFactory",
- "edu.uci.ics.asterix.external.adapter.factory.CNNFeedAdapterFactory", };
+ "edu.uci.ics.asterix.external.adapter.factory.CNNFeedAdapterFactory",
+ "edu.uci.ics.asterix.tools.external.data.TwitterFirehoseFeedAdapterFactory",
+ "edu.uci.ics.asterix.tools.external.data.GenericSocketFeedAdapterFactory",
+ "edu.uci.ics.asterix.tools.external.data.SyntheticTwitterFeedAdapterFactory"};
DatasourceAdapter adapter;
for (String adapterClassName : builtInAdapterClassNames) {
adapter = getAdapter(adapterClassName);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
index 4344ac8..68dcc4c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
@@ -20,4 +20,8 @@
return numberOfNodes;
}
+ @Override
+ public String toString() {
+ return WorkType.ADD_NODE + " " + numberOfNodes + " requested by " + subscriber;
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index b639559..5fee34f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -104,9 +104,11 @@
Patterns startNCPattern = new Patterns(pattern);
client.submit(startNCPattern);
+ removeNode(cluster.getNode(), node);
+
AsterixInstance instance = lookupService.getAsterixInstance(cluster.getInstanceName());
instance.getCluster().getNode().add(node);
- instance.getCluster().getSubstituteNodes().getNode().remove(node);
+ removeNode(instance.getCluster().getSubstituteNodes().getNode(), node);
lookupService.updateAsterixInstance(instance);
} catch (Exception e) {
@@ -115,6 +117,19 @@
}
+ private void removeNode(List<Node> list, Node node) {
+ Node nodeToRemove = null;
+ for (Node n : list) {
+ if (n.getId().equals(node.getId())) {
+ nodeToRemove = n;
+ break;
+ }
+ }
+ if (nodeToRemove != null) {
+ list.remove(nodeToRemove);
+ }
+ }
+
@Override
public void removeNode(Node node) throws AsterixException {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
index 0b15df7..90683d1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
@@ -22,4 +22,15 @@
return nodesToBeRemoved;
}
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(WorkType.REMOVE_NODE);
+ for (String node : nodesToBeRemoved) {
+ builder.append(node + " ");
+ }
+ builder.append(" requested by " + subscriber);
+ return builder.toString();
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 775ef9e..3d3bcac 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -63,7 +63,6 @@
import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
@@ -109,7 +108,6 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -118,7 +116,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -129,7 +126,6 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -421,7 +417,6 @@
FeedDatasetDetails datasetDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
DatasourceAdapter adapterEntity;
- IDatasourceAdapter adapter;
IAdapterFactory adapterFactory;
IAType adapterOutputType;
String adapterName;
@@ -490,12 +485,12 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
AqlMetadataProvider metadataProvider, JobSpecification jobSpec, FeedDatasetDetails datasetDetails,
- String dataverse, String dataset, List<IFeedMessage> feedMessages, FeedActivity feedActivity)
+ String dataverse, String dataset, IFeedMessage feedMessage, FeedActivity feedActivity)
throws AlgebricksException {
AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(feedActivity
.getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS).split(","));
FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
- feedMessages);
+ feedMessage);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index 48f16b6..b57f8fe 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -49,11 +49,13 @@
public static class FeedActivityDetails {
public static final String COMPUTE_LOCATIONS = "compute-locations";
public static final String INGEST_LOCATIONS = "ingest-locations";
+ public static final String STORAGE_LOCATIONS = "storage-locations";
public static final String TOTAL_INGESTED = "total-ingested";
public static final String INGESTION_RATE = "ingestion-rate";
public static final String EXCEPTION_LOCATION = "exception-location";
public static final String EXCEPTION_MESSAGE = "exception-message";
public static final String FEED_POLICY_NAME = "feed-policy-name";
+ public static final String FEED_NODE_FAILURE = "feed-node-failure";
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
new file mode 100644
index 0000000..78daa08
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
+import edu.uci.ics.asterix.metadata.feeds.MaterializingFrameWriter.Mode;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class AdapterRuntimeManager implements IAdapterExecutor {
+
+ private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
+
+ private final FeedId feedId;
+
+ private IFeedAdapter feedAdapter;
+
+ private AdapterExecutor adapterExecutor;
+
+ private Thread adapterRuntime;
+
+ private State state;
+
+ private FeedInboxMonitor feedInboxMonitor;
+
+ public enum State {
+ ACTIVE_INGESTION,
+ INACTIVE_INGESTION,
+ FINISHED_INGESTION
+ }
+
+ public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, MaterializingFrameWriter writer,
+ int partition, LinkedBlockingQueue<IFeedMessage> inbox) {
+ this.feedId = feedId;
+ this.feedAdapter = feedAdapter;
+ this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
+ this.adapterRuntime = new Thread(adapterExecutor);
+ this.feedInboxMonitor = new FeedInboxMonitor(this, inbox, partition);
+ AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
+ }
+
+ @Override
+ public void start() throws Exception {
+ state = State.ACTIVE_INGESTION;
+ FeedManager.INSTANCE.registerFeedRuntime(this);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
+ }
+ adapterRuntime.start();
+ }
+
+ @Override
+ public void stop() {
+ try {
+ feedAdapter.stop();
+ state = State.FINISHED_INGESTION;
+ synchronized (this) {
+ notifyAll();
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to stop adapter");
+ }
+ }
+ }
+
+ @Override
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+ public IFeedAdapter getFeedAdapter() {
+ return feedAdapter;
+ }
+
+ public void setFeedAdapter(IFeedAdapter feedAdapter) {
+ this.feedAdapter = feedAdapter;
+ }
+
+ public static class AdapterExecutor implements Runnable {
+
+ private MaterializingFrameWriter writer;
+
+ private final int partition;
+
+ private IFeedAdapter adapter;
+
+ private AdapterRuntimeManager runtimeManager;
+
+ public AdapterExecutor(int partition, MaterializingFrameWriter writer, IFeedAdapter adapter,
+ AdapterRuntimeManager adapterRuntimeMgr) {
+ this.writer = writer;
+ this.partition = partition;
+ this.adapter = adapter;
+ this.runtimeManager = adapterRuntimeMgr;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting ingestion for partition:" + partition);
+ }
+ adapter.start(partition, writer);
+ runtimeManager.setState(State.FINISHED_INGESTION);
+ synchronized (runtimeManager) {
+ runtimeManager.notifyAll();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public MaterializingFrameWriter getWriter() {
+ return writer;
+ }
+
+ public void setWriter(IFrameWriter writer) {
+ if (this.writer != null) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switching writer to:" + writer + " from " + this.writer);
+ }
+ this.writer.setWriter(writer);
+ }
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ @SuppressWarnings("incomplete-switch")
+ public void setState(State state) throws HyracksDataException {
+ if (this.state.equals(state)) {
+ return;
+ }
+ switch (state) {
+ case INACTIVE_INGESTION:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("SETTING STORE MODE");
+ }
+ adapterExecutor.getWriter().setMode(Mode.STORE);
+ break;
+ case ACTIVE_INGESTION:
+ adapterExecutor.getWriter().setMode(Mode.FORWARD);
+ break;
+ }
+ this.state = state;
+ }
+
+ public AdapterExecutor getAdapterExecutor() {
+ return adapterExecutor;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 3aa6b05..3db5916 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -8,17 +8,20 @@
public class BuiltinFeedPolicies {
- public static final FeedPolicy MISSTION_CRITICAL = initializeMissionCriticalFeedPolicy();
-
- public static final FeedPolicy ADVANCED = initializeAdvancedFeedPolicy();
-
- public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
+ public static final FeedPolicy BRITTLE = initializeBrittlePolicy();
public static final FeedPolicy BASIC = initializeBasicPolicy();
- public static final FeedPolicy[] policies = new FeedPolicy[] { MISSTION_CRITICAL, ADVANCED, BASIC_MONITORED, BASIC };
+ public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
- public static final FeedPolicy DEFAULT_POLICY = ADVANCED;
+ public static final FeedPolicy FAULT_TOLERANT_BASIC_MONITORED = initializeFaultTolerantBasicMonitoredPolicy();
+
+ public static final FeedPolicy ELASTIC = initializeFaultTolerantBasicMonitoredElasticPolicy();
+
+ public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_MONITORED,
+ FAULT_TOLERANT_BASIC_MONITORED, ELASTIC };
+
+ public static final FeedPolicy DEFAULT_POLICY = FAULT_TOLERANT_BASIC_MONITORED;
public static final String CONFIG_FEED_POLICY_KEY = "policy";
@@ -31,53 +34,74 @@
return null;
}
- private static FeedPolicy initializeMissionCriticalFeedPolicy() {
+ private static FeedPolicy initializeFaultTolerantBasicMonitoredElasticPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
- policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "true");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
- policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
- String description = "MissionCritical";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "MissionCritical", description, policyParams);
+ policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
+ String description = "Basic Monitored Fault-Tolerant Elastic";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMFE", description, policyParams);
}
- private static FeedPolicy initializeBasicPolicy() {
+ private static FeedPolicy initializeFaultTolerantBasicMonitoredPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "false");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
- policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "false");
+ policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
- String description = "Basic";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
+ String description = "Basic Monitored Fault-Tolerant";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMF", description, policyParams);
}
private static FeedPolicy initializeBasicMonitoredPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
- policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "5");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+ policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
- String description = "BasicMonitored";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicMonitored", description, policyParams);
+ String description = "Basic Monitored Fault-Tolerant";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BM", description, policyParams);
}
- private static FeedPolicy initializeAdvancedFeedPolicy() {
+ private static FeedPolicy initializeBasicPolicy() {
Map<String, String> policyParams = new HashMap<String, String>();
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
- policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
- policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
- policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+ policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
- String description = "Advanced";
- return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Advanced", description, policyParams);
+ String description = "Basic";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "B", description, policyParams);
}
+
+ private static FeedPolicy initializeBrittlePolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "false");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
+ policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "false");
+ policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ String description = "Brittle";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Br", description, policyParams);
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
new file mode 100644
index 0000000..a74627c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
@@ -0,0 +1,205 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
+import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.IDataParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class ConditionalPushTupleParserFactory implements ITupleParserFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(ConditionalPushTupleParserFactory.class.getName());
+
+ @Override
+ public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
+ IDataParser dataParser = null;
+ switch (parserType) {
+ case ADM:
+ dataParser = new ADMDataParser();
+ break;
+ case DELIMITED_DATA:
+ dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter);
+ break;
+ }
+ return new ConditionalPushTupleParser(ctx, recordType, dataParser, configuration);
+ }
+
+ private final ARecordType recordType;
+ private final Map<String, Object> configuration;
+ private IValueParserFactory[] valueParserFactories;
+ private char delimiter;
+ private final ParserType parserType;
+ private Object lock;
+
+ public enum ParserType {
+ ADM,
+ DELIMITED_DATA
+ }
+
+ public ConditionalPushTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
+ char fieldDelimiter, Map<String, Object> configuration) {
+ this.recordType = recordType;
+ this.valueParserFactories = valueParserFactories;
+ this.delimiter = fieldDelimiter;
+ this.configuration = configuration;
+ this.parserType = ParserType.DELIMITED_DATA;
+
+ }
+
+ public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
+ this.recordType = recordType;
+ this.configuration = configuration;
+ this.parserType = ParserType.ADM;
+ }
+
+}
+
+class ConditionalPushTupleParser extends AbstractTupleParser {
+
+ private final IDataParser dataParser;
+ private int batchSize;
+ private long batchInterval;
+ private boolean continueIngestion = true;
+ private int tuplesInFrame = 0;
+ private TimeBasedFlushTask flushTask;
+ private Timer timer = new Timer();
+ private Object lock = new Object();
+ private boolean activeTimer = false;
+
+ public static final String BATCH_SIZE = "batch-size";
+ public static final String BATCH_INTERVAL = "batch-interval";
+
+ public ConditionalPushTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
+ Map<String, Object> configuration) {
+ super(ctx, recType);
+ this.dataParser = dataParser;
+ String propValue = (String) configuration.get(BATCH_SIZE);
+ batchSize = propValue != null ? Integer.parseInt(propValue) : Integer.MAX_VALUE;
+ propValue = (String) configuration.get(BATCH_INTERVAL);
+ batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
+ activeTimer = batchInterval > 0;
+ }
+
+ public void stop() {
+ continueIngestion = false;
+ }
+
+ @Override
+ public IDataParser getDataParser() {
+ return dataParser;
+ }
+
+ @Override
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+ flushTask = new TimeBasedFlushTask(writer, lock);
+ appender.reset(frame, true);
+ IDataParser parser = getDataParser();
+ try {
+ parser.initialize(in, recType, true);
+ if (activeTimer) {
+ timer.schedule(flushTask, 0, batchInterval);
+ }
+ while (continueIngestion) {
+ tb.reset();
+ if (!parser.parse(tb.getDataOutput())) {
+ break;
+ }
+ tb.addFieldEndOffset();
+ addTuple(writer);
+ }
+ if (appender.getTupleCount() > 0) {
+ if (activeTimer) {
+ synchronized (lock) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } else {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ }
+ } catch (AsterixException ae) {
+ throw new HyracksDataException(ae);
+ } catch (IOException ioe) {
+ throw new HyracksDataException(ioe);
+ } finally {
+ if (activeTimer) {
+ timer.cancel();
+ }
+ }
+ }
+
+ protected void addTuple(IFrameWriter writer) throws HyracksDataException {
+ if (activeTimer) {
+ synchronized (lock) {
+ addTupleToFrame(writer);
+ }
+ } else {
+ addTupleToFrame(writer);
+ }
+ }
+
+ protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
+ if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ if (tuplesInFrame == batchSize) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Batch size exceeded! flushing frame " + "(" + tuplesInFrame + ")");
+ }
+ }
+ tuplesInFrame = 0;
+ }
+ tuplesInFrame++;
+ }
+
+ private class TimeBasedFlushTask extends TimerTask {
+
+ private IFrameWriter writer;
+ private final Object lock;
+
+ public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
+ this.writer = writer;
+ this.lock = lock;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (tuplesInFrame > 0) {
+ synchronized (lock) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
+ }
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ tuplesInFrame = 0;
+ }
+ }
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 28d0da3..d007d6f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -52,9 +52,9 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- IDatasourceAdapter adapter;
+ IFeedAdapter adapter;
try {
- adapter = adapterFactory.createAdapter(ctx);
+ adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx);
} catch (Exception e) {
throw new HyracksDataException("initialization of adapter failed", e);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 6898ef5..3893e54 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -17,8 +17,11 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -27,52 +30,65 @@
*/
public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- private final IDatasourceAdapter adapter;
+ private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
+
+ private IFeedAdapter adapter;
private final int partition;
- private final IFeedManager feedManager;
private final FeedId feedId;
private final LinkedBlockingQueue<IFeedMessage> inbox;
- private FeedInboxMonitor feedInboxMonitor;
private final Map<String, String> feedPolicy;
private final FeedPolicyEnforcer policyEnforcer;
+ private AdapterRuntimeManager adapterRuntimeMgr;
- public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, Map<String, String> feedPolicy,
+ public FeedIntakeOperatorNodePushable(FeedId feedId, IFeedAdapter adapter, Map<String, String> feedPolicy,
int partition) {
this.adapter = adapter;
this.partition = partition;
- this.feedManager = (IFeedManager) FeedManager.INSTANCE;
this.feedId = feedId;
inbox = new LinkedBlockingQueue<IFeedMessage>();
this.feedPolicy = feedPolicy;
policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
-
}
@Override
public void open() throws HyracksDataException {
- feedInboxMonitor = new FeedInboxMonitor((IFeedAdapter) adapter, inbox, partition);
- AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
- feedManager.registerFeedMsgQueue(feedId, inbox);
-
- writer.open();
+ adapterRuntimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId, partition);
try {
- if (adapter instanceof AbstractFeedDatasourceAdapter) {
- ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+ if (adapterRuntimeMgr == null) {
+ MaterializingFrameWriter mWriter = new MaterializingFrameWriter(writer);
+ adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
+ if (adapter instanceof AbstractFeedDatasourceAdapter) {
+ ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Beginning new feed:" + feedId);
+ }
+ mWriter.open();
+ adapterRuntimeMgr.start();
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resuming old feed:" + feedId);
+ }
+ adapter = adapterRuntimeMgr.getFeedAdapter();
+ writer.open();
+ adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
+ adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
}
- adapter.start(partition, writer);
+
+ synchronized (adapterRuntimeMgr) {
+ while (!adapterRuntimeMgr.getState().equals(State.FINISHED_INGESTION)) {
+ adapterRuntimeMgr.wait();
+ }
+ }
+ FeedManager.INSTANCE.deRegisterFeedRuntime(adapterRuntimeMgr);
+ } catch (InterruptedException ie) {
+ // check policy
+ adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
} catch (Exception e) {
e.printStackTrace();
throw new HyracksDataException(e);
} finally {
writer.close();
- if (adapter instanceof IFeedAdapter) {
- try {
- ((IFeedAdapter) adapter).stop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- feedManager.unregisterFeedMsgQueue(feedId, inbox);
- }
}
}
@@ -99,11 +115,11 @@
class FeedInboxMonitor extends Thread {
private LinkedBlockingQueue<IFeedMessage> inbox;
- private final IFeedAdapter adapter;
+ private final AdapterRuntimeManager runtimeMgr;
- public FeedInboxMonitor(IFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+ public FeedInboxMonitor(AdapterRuntimeManager runtimeMgr, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
this.inbox = inbox;
- this.adapter = adapter;
+ this.runtimeMgr = runtimeMgr;
}
@Override
@@ -112,11 +128,11 @@
try {
IFeedMessage feedMessage = inbox.take();
switch (feedMessage.getMessageType()) {
- case STOP:
- adapter.stop();
+ case END:
+ runtimeMgr.stop();
break;
case ALTER:
- adapter.alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
+ runtimeMgr.getFeedAdapter().alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
break;
}
} catch (InterruptedException ie) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index f9892f2..b3321cb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -14,19 +14,13 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.FeedId;
-import edu.uci.ics.asterix.metadata.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
/**
- * Handle (de-)registration of feeds for delivery of control messages.
+ * Handle (de)registration of feeds for delivery of control messages.
*/
public class FeedManager implements IFeedManager {
@@ -36,39 +30,45 @@
}
- private Map<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>> outGoingMsgQueueMap = new HashMap<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>>();
+ private Map<FeedId, List<AdapterRuntimeManager>> activeFeedRuntimeManagers = new HashMap<FeedId, List<AdapterRuntimeManager>>();
@Override
- public void deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws AsterixException {
- Set<LinkedBlockingQueue<IFeedMessage>> operatorQueues = outGoingMsgQueueMap.get(feedId);
- try {
- if (operatorQueues != null) {
- for (LinkedBlockingQueue<IFeedMessage> queue : operatorQueues) {
- queue.put(feedMessage);
+ public synchronized void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
+ List<AdapterRuntimeManager> adpaterRuntimeMgrs = activeFeedRuntimeManagers.get(adapterRuntimeMgr.getFeedId());
+ if (adpaterRuntimeMgrs == null) {
+ adpaterRuntimeMgrs = new ArrayList<AdapterRuntimeManager>();
+ activeFeedRuntimeManagers.put(adapterRuntimeMgr.getFeedId(), adpaterRuntimeMgrs);
+ }
+ adpaterRuntimeMgrs.add(adapterRuntimeMgr);
+ }
+
+ @Override
+ public synchronized void deRegisterFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
+ List<AdapterRuntimeManager> adapterRuntimeMgrs = activeFeedRuntimeManagers.get(adapterRuntimeMgr.getFeedId());
+ if (adapterRuntimeMgrs != null && adapterRuntimeMgrs.contains(adapterRuntimeMgr)) {
+ adapterRuntimeMgrs.remove(adapterRuntimeMgr);
+ }
+ }
+
+ @Override
+ public synchronized AdapterRuntimeManager getFeedRuntimeManager(FeedId feedId, int partition) {
+ List<AdapterRuntimeManager> adapterRuntimeMgrs = activeFeedRuntimeManagers.get(feedId);
+ if (adapterRuntimeMgrs != null) {
+ if (adapterRuntimeMgrs.size() == 1) {
+ return adapterRuntimeMgrs.get(0);
+ } else {
+ for (AdapterRuntimeManager mgr : adapterRuntimeMgrs) {
+ if (mgr.getAdapterExecutor().getPartition() == partition) {
+ return mgr;
+ }
}
}
- } catch (Exception e) {
- throw new AsterixException(e);
}
+ return null;
}
- @Override
- public void registerFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
- Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
- if (feedQueues == null) {
- feedQueues = new HashSet<LinkedBlockingQueue<IFeedMessage>>();
- }
- feedQueues.add(queue);
- outGoingMsgQueueMap.put(feedId, feedQueues);
- }
-
- @Override
- public void unregisterFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
- Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
- if (feedQueues == null || !feedQueues.contains(queue)) {
- throw new IllegalArgumentException(" Unable to de-register feed message queue. Unknown feedId " + feedId);
- }
- feedQueues.remove(queue);
+ public List<AdapterRuntimeManager> getFeedRuntimeManagers(FeedId feedId) {
+ return activeFeedRuntimeManagers.get(feedId);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index 677fc40..a302978 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.util.List;
-
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -31,20 +29,19 @@
private static final long serialVersionUID = 1L;
private final FeedId feedId;
- private final List<IFeedMessage> feedMessages;
- private final boolean sendToAll = true;
+ private final IFeedMessage feedMessage;
public FeedMessageOperatorDescriptor(JobSpecification spec, String dataverse, String dataset,
- List<IFeedMessage> feedMessages) {
+ IFeedMessage feedMessage) {
super(spec, 0, 1);
this.feedId = new FeedId(dataverse, dataset);
- this.feedMessages = feedMessages;
+ this.feedMessage = feedMessage;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessages, sendToAll, partition, nPartitions);
+ return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessage, partition, nPartitions);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index be5c8f9..03daa145 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -14,9 +14,7 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.util.ArrayList;
-import java.util.List;
-
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -27,27 +25,33 @@
public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private final FeedId feedId;
- private final List<IFeedMessage> feedMessages;
- private IFeedManager feedManager;
+ private final IFeedMessage feedMessage;
+ private final int partition;
- public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, List<IFeedMessage> feedMessages,
- boolean applyToAll, int partition, int nPartitions) {
+ public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IFeedMessage feedMessage,
+ int partition, int nPartitions) {
this.feedId = feedId;
- if (applyToAll) {
- this.feedMessages = feedMessages;
- } else {
- this.feedMessages = new ArrayList<IFeedMessage>();
- feedMessages.add(feedMessages.get(partition));
- }
- feedManager = (IFeedManager) FeedManager.INSTANCE;
+ this.feedMessage = feedMessage;
+ this.partition = partition;
}
@Override
public void initialize() throws HyracksDataException {
try {
writer.open();
- for (IFeedMessage feedMessage : feedMessages) {
- feedManager.deliverMessage(feedId, feedMessage);
+ AdapterRuntimeManager adapterRuntimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId, partition);
+ if (adapterRuntimeMgr != null) {
+ switch (feedMessage.getMessageType()) {
+ case END:
+ adapterRuntimeMgr.stop();
+ break;
+ case ALTER:
+ adapterRuntimeMgr.getFeedAdapter().alter(
+ ((AlterFeedMessage) feedMessage).getAlteredConfParams());
+ break;
+ }
+ } else {
+ throw new AsterixException("Unknown feed: " + feedId);
}
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
index 3e16877..a3c505b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
@@ -3,12 +3,14 @@
import java.util.Map;
public class FeedPolicyAccessor {
- public static final String APPLICATION_FAILURE_PERSIST_EXCEPTION = "software.failure.persist.exception";
- public static final String APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION = "software.failure.continue.on.exception";
- public static final String HARDWARE_FAILURE_AUTO_RESTART = "hardware.failure.auto.restart";
- public static final String STATISTICS_COLLECT = "statistics.collect";
- public static final String STATISTICS_COLLECT_PERIOD = "statistics.collect.period";
- public static final String STATISTICS_COLLECT_PERIOD_UNIT = "statistics.collect.period.unit";
+ public static final String FAILURE_LOG_ERROR = "failure.log.error";
+ public static final String APPLICATION_FAILURE_LOG_DATA = "application.failure.log.data";
+ public static final String APPLICATION_FAILURE_CONTINUE = "application.failure.continue";
+ public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
+ public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
+ public static final String COLLECT_STATISTICS = "collect.statistics";
+ public static final String COLLECT_STATISTICS_PERIOD = "collect.statistics.period";
+ public static final String COLLECT_STATISTICS_PERIOD_UNIT = "collect.statistics.period.unit";
public static final String ELASTIC = "elastic";
public enum TimeUnit {
@@ -24,24 +26,32 @@
this.feedPolicy = feedPolicy;
}
- public boolean persistExceptionDetailsOnApplicationFailure() {
- return getBooleanPropertyValue(APPLICATION_FAILURE_PERSIST_EXCEPTION);
+ public boolean logErrorOnFailure() {
+ return getBooleanPropertyValue(FAILURE_LOG_ERROR);
+ }
+
+ public boolean logDataOnApplicationFailure() {
+ return getBooleanPropertyValue(APPLICATION_FAILURE_LOG_DATA);
}
public boolean continueOnApplicationFailure() {
- return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION);
+ return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE);
}
- public boolean autoRestartOnHardwareFailure() {
- return getBooleanPropertyValue(HARDWARE_FAILURE_AUTO_RESTART);
+ public boolean continueOnHardwareFailure() {
+ return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE);
+ }
+
+ public boolean autoRestartOnClusterReboot() {
+ return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART);
}
public boolean collectStatistics() {
- return getBooleanPropertyValue(STATISTICS_COLLECT);
+ return getBooleanPropertyValue(COLLECT_STATISTICS);
}
public long getStatisicsCollectionPeriodInSecs() {
- return getIntegerPropertyValue(STATISTICS_COLLECT_PERIOD) * getTimeUnitFactor();
+ return getIntegerPropertyValue(COLLECT_STATISTICS_PERIOD) * getTimeUnitFactor();
}
public boolean isElastic() {
@@ -49,7 +59,7 @@
}
private int getTimeUnitFactor() {
- String v = feedPolicy.get(STATISTICS_COLLECT_PERIOD_UNIT);
+ String v = feedPolicy.get(COLLECT_STATISTICS_PERIOD_UNIT);
int factor = 1;
switch (TimeUnit.valueOf(v)) {
case SEC:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
index f708c91..0f8bcee 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
@@ -26,7 +26,7 @@
public boolean handleSoftwareFailure(Exception e) throws RemoteException, ACIDException {
boolean continueIngestion = feedPolicyAccessor.continueOnApplicationFailure();
- if (feedPolicyAccessor.persistExceptionDetailsOnApplicationFailure()) {
+ if (feedPolicyAccessor.logErrorOnFailure()) {
persistExceptionDetails(e);
}
return continueIngestion;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
new file mode 100644
index 0000000..0f71166
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+public interface IAdapterExecutor {
+
+ public void start() throws Exception;
+
+ public void stop() throws Exception;
+
+ public FeedId getFeedId();
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
index a14b95f..9c5a612 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
@@ -20,7 +20,7 @@
* Interface implemented by an adapter that can be controlled or managed by external
* commands (stop,alter)
*/
-public interface IFeedAdapter {
+public interface IFeedAdapter extends IDatasourceAdapter {
/**
* Discontinue the ingestion of data and end the feed.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index ae728b2..46302e8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -14,47 +14,26 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-
/**
- * Handle (de-)registration of feeds for delivery of control messages.
+ * Handle (de)registration of feeds for delivery of control messages.
*/
public interface IFeedManager {
/**
- * Register an input message queue for a feed specified by feedId.
- * All messages sent to a feed are directed to the registered queue(s).
- *
- * @param feedId
- * an identifier for the feed dataset.
- * @param queue
- * an input message queue for receiving control messages.
+ * @param adapterRuntimeMgr
*/
- public void registerFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+ public void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr);
/**
- * Unregister an input message queue for a feed specified by feedId.
- * A feed prior to finishing should unregister all previously registered queue(s)
- * as it is no longer active and thus need not process any control messages.
- *
- * @param feedId
- * an identifier for the feed dataset.
- * @param queue
- * an input message queue for receiving control messages.
+ * @param adapterRuntimeMgr
*/
- public void unregisterFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+ public void deRegisterFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr);
/**
- * Deliver a message to a feed with a given feedId.
- *
* @param feedId
- * identifier for the feed dataset.
- * @param feedMessage
- * control message that needs to be delivered.
- * @throws Exception
+ * @param partition
+ * @return
*/
- public void deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws AsterixException;
+ public AdapterRuntimeManager getFeedRuntimeManager(FeedId feedId, int partition);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
index 85a15ab..36a45be 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
@@ -19,7 +19,7 @@
public interface IFeedMessage extends Serializable {
public enum MessageType {
- STOP,
+ END,
ALTER,
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java
new file mode 100644
index 0000000..b423b89
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java
@@ -0,0 +1,118 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MaterializingFrameWriter implements IFrameWriter {
+
+ private static final Logger LOGGER = Logger.getLogger(MaterializingFrameWriter.class.getName());
+
+ private IFrameWriter writer;
+
+ private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+
+ private Mode mode;
+
+ public enum Mode {
+ FORWARD,
+ STORE
+ }
+
+ public MaterializingFrameWriter(IFrameWriter writer) {
+ this.writer = writer;
+ this.mode = Mode.FORWARD;
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public void setMode(Mode newMode) throws HyracksDataException {
+ if (this.mode.equals(newMode)) {
+ return;
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switching to :" + newMode + " from " + this.mode);
+ }
+ switch (newMode) {
+ case FORWARD:
+ this.mode = newMode;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Sending accumulated frames :" + frames.size());
+ }
+ break;
+ case STORE:
+ this.mode = newMode;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Beginning to store frames :");
+ LOGGER.info("Frames accumulated till now:" + frames.size());
+ }
+ break;
+ }
+
+ }
+
+ public List<ByteBuffer> getStoredFrames() {
+ return frames;
+ }
+
+ public void clear() {
+ frames.clear();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ switch (mode) {
+ case FORWARD:
+ writer.nextFrame(buffer);
+ if (frames.size() > 0) {
+ for (ByteBuffer buf : frames) {
+ System.out.println("Flusing OLD frame: " + buf);
+ writer.nextFrame(buf);
+ }
+ }
+ frames.clear();
+ break;
+ case STORE:
+ ByteBuffer storageBuffer = ByteBuffer.allocate(buffer.capacity());
+ storageBuffer.put(buffer);
+ frames.add(storageBuffer);
+ storageBuffer.flip();
+ break;
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ public IFrameWriter getWriter() {
+ return writer;
+ }
+
+ public void setWriter(IFrameWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public String toString() {
+ return "MaterializingFrameWriter using " + writer;
+ }
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index f764954..96cba42 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -114,8 +114,8 @@
}
public Node getAvailableSubstitutionNode() {
- List<Node> subNodes = cluster.getSubstituteNodes().getNode();
- return subNodes.isEmpty() ? null : subNodes.get(0);
+ List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode();
+ return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
index f2d9384..25cb703 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
@@ -18,6 +18,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.om.types.ARecordType;
@@ -36,6 +38,8 @@
*/
public abstract class AbstractTupleParser implements ITupleParser {
+ protected static Logger LOGGER = Logger.getLogger(AbstractTupleParser.class.getName());
+
protected ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
protected DataOutput dos = tb.getDataOutput();
protected final FrameTupleAppender appender;
@@ -68,6 +72,7 @@
addTupleToFrame(writer);
}
if (appender.getTupleCount() > 0) {
+
FrameUtils.flushFrame(frame, writer);
}
} catch (AsterixException ae) {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index 39537a8..0c93564 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -14,6 +14,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -182,7 +183,8 @@
Message message = randMessageGen.getNextRandomMessage();
Point location = randLocationGen.getRandomPoint();
DateTime sendTime = randDateGen.getNextRandomDatetime();
- twMessage.reset(twMessageId + "", twUser, location, sendTime, message.getReferredTopics(), message);
+ twMessage.reset(UUID.randomUUID().toString(), twUser, location, sendTime, message.getReferredTopics(),
+ message);
twMessageId++;
if (twUserId > numTwOnlyUsers) {
twUserId = 1;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
new file mode 100644
index 0000000..b693073
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -0,0 +1,97 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class GenericSocketFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String KEY_PORT = "port";
+
+ private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
+
+ private Map<String, Object> configuration;
+
+ private SocketFeedServer socketFeedServer;
+
+ private static final int DEFAULT_PORT = 2909;
+
+ public GenericSocketFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+ ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
+ super(parserFactory, outputtype, ctx);
+ this.configuration = configuration;
+ String portValue = (String) this.configuration.get(KEY_PORT);
+ int port = portValue != null ? Integer.parseInt(portValue) : DEFAULT_PORT;
+ this.socketFeedServer = new SocketFeedServer(configuration, outputtype, port);
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ super.start(partition, writer);
+ }
+
+ @Override
+ public InputStream getInputStream(int partition) throws IOException {
+ return socketFeedServer.getInputStream();
+ }
+
+ private static class SocketFeedServer {
+ private ServerSocket serverSocket;
+ private InputStream inputStream;
+
+ public SocketFeedServer(Map<String, Object> configuration, ARecordType outputtype, int port)
+ throws IOException, AsterixException {
+ try {
+ serverSocket = new ServerSocket(port);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("port: " + port + " unusable ");
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Feed server configured to use port: " + port);
+ }
+ }
+
+ public InputStream getInputStream() {
+ Socket socket;
+ try {
+ socket = serverSocket.accept();
+ inputStream = socket.getInputStream();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return inputStream;
+ }
+
+ public void stop() throws IOException {
+ serverSocket.close();
+ }
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+ socketFeedServer.stop();
+ }
+
+ @Override
+ public void alter(Map<String, Object> properties) {
+ // TODO Auto-generated method stub
+
+ }
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
new file mode 100644
index 0000000..961e519
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -0,0 +1,79 @@
+/*
+x * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Factory class for creating @see{GenericSocketFeedAdapter} The
+ * adapter listens at a port for receiving data (from external world).
+ * Data received is transformed into Asterix Data Format (ADM) and stored into
+ * a dataset a configured in the Adapter configuration.
+ */
+public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements ITypedAdapterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private ARecordType outputType;
+
+ @Override
+ public String getName() {
+ return "generic_socket_feed";
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.GENERIC;
+ }
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return SupportedOperation.READ;
+ }
+
+ @Override
+ public void configure(Map<String, Object> configuration) throws Exception {
+ this.configuration = configuration;
+ outputType = (ARecordType) configuration.get(KEY_SOURCE_DATATYPE);
+ this.configureFormat(outputType);
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(1);
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ return new GenericSocketFeedAdapter(configuration, parserFactory, outputType, ctx);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return outputType;
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 16812d9..dbe5d9a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -19,7 +19,7 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.FileSystemAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
@@ -47,7 +47,7 @@
* on the local file system or on HDFS. The feed ends when the content of the
* source file has been ingested.
*/
-public class RateControlledFileSystemBasedAdapterFactory extends FileSystemAdapterFactory {
+public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String KEY_FILE_SYSTEM = "fs";
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 5ba983e..0e58ed2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -115,12 +115,12 @@
private void writeTweet(TweetMessage next) {
- //tweet id
+ // tweet id
LOGGER.info("Generating next tweet");
((AMutableString) mutableFields[0]).setValue(next.getTweetid());
mutableRecord.setValueAtPos(0, mutableFields[0]);
- // user
+ // user
AMutableRecord userRecord = ((AMutableRecord) mutableFields[1]);
((AMutableString) userRecord.getValueByPos(0)).setValue(next.getUser().getScreenName());
((AMutableString) userRecord.getValueByPos(1)).setValue("en");
@@ -171,12 +171,10 @@
@Override
public InflowState setNextRecord() throws Exception {
- LOGGER.info("requesting next tweet");
boolean moreData = tweetIterator.hasNext();
if (!moreData) {
return InflowState.NO_MORE_DATA;
- }
- LOGGER.info("writing next tweet");
+ }
writeTweet(tweetIterator.next());
if (tweetInterval != 0) {
tweetCount++;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 92dc394..afb57bf 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -57,7 +57,7 @@
@Override
public String getName() {
- return "synthetic_twitter_feed";
+ return "pull_twitter_feed";
}
@Override
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index 7b17923..9cae374 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -5,6 +5,8 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -100,14 +102,15 @@
String tweet = next.toString();
os.write(tweet.getBytes());
os.write(EOL);
- LOGGER.info(tweet);
+ /*
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(tweet);
+ }*/
}
private void writeTweetRecord(TweetMessage next) {
//tweet id
- LOGGER.info("Generating next tweet");
-
((AMutableString) mutableFields[0]).setValue(next.getTweetid());
mutableRecord.setValueAtPos(0, mutableFields[0]);
@@ -162,12 +165,10 @@
@Override
public InflowState setNextRecord() throws Exception {
- LOGGER.info("requesting next tweet");
boolean moreData = tweetIterator.hasNext();
if (!moreData) {
return InflowState.NO_MORE_DATA;
}
- LOGGER.info("writing next tweet");
TweetMessage msg = tweetIterator.next();
if (isOutputFormatRecord) {
writeTweetRecord(msg);
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index eef551d..c8fa860 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -7,6 +7,7 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -47,8 +48,7 @@
this.twitterFeedClient = new TweetGenerator(configuration, outputtype, 0,
TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
this.twitterServer = new TwitterServer(configuration, outputtype);
- this.twitterClient = new TwitterClient(PORT);
-
+ this.twitterClient = new TwitterClient(twitterServer.getPort());
}
@Override
@@ -64,12 +64,27 @@
}
private static class TwitterServer {
- private final ServerSocket serverSocket;
+ private ServerSocket serverSocket;
private final Listener listener;
+ private int port = -1;
public TwitterServer(Map<String, Object> configuration, ARecordType outputtype) throws IOException,
AsterixException {
- serverSocket = new ServerSocket(PORT);
+ int numAttempts = 0;
+ while (port < 0) {
+ try {
+ serverSocket = new ServerSocket(PORT + numAttempts);
+ port = PORT + numAttempts;
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("port: " + (PORT + numAttempts) + " unusable ");
+ }
+ numAttempts++;
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Twitter server configured to use port: " + port);
+ }
listener = new Listener(serverSocket, configuration, outputtype);
}
@@ -78,6 +93,14 @@
t.start();
}
+ public void stop() {
+ listener.stop();
+ }
+
+ public int getPort() {
+ return port;
+ }
+
}
private static class TwitterClient {
@@ -103,6 +126,7 @@
private final ServerSocket serverSocket;
private Socket socket;
private TweetGenerator tweetGenerator;
+ private boolean continuePush = true;
public Listener(ServerSocket serverSocket, Map<String, Object> configuration, ARecordType outputtype)
throws IOException, AsterixException {
@@ -119,7 +143,7 @@
socket = serverSocket.accept();
OutputStream os = socket.getOutputStream();
tweetGenerator.setOutputStream(os);
- while (state.equals(InflowState.DATA_AVAILABLE)) {
+ while (state.equals(InflowState.DATA_AVAILABLE) && continuePush) {
state = tweetGenerator.setNextRecord();
}
os.close();
@@ -137,12 +161,15 @@
}
}
+ public void stop() {
+ continuePush = false;
+ }
+
}
@Override
public void stop() throws Exception {
- // TODO Auto-generated method stub
-
+ twitterServer.stop();
}
@Override
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 11c9172..9500436 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -20,7 +20,7 @@
import java.util.Set;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.FileSystemAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -43,7 +43,7 @@
* on the local file system or on HDFS. The feed ends when the content of the
* source file has been ingested.
*/
-public class TwitterFirehoseFeedAdapterFactory extends FileSystemAdapterFactory implements ITypedAdapterFactory {
+public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory implements ITypedAdapterFactory {
/**
*
@@ -51,12 +51,13 @@
private static final long serialVersionUID = 1L;
private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
+ private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
private static final ARecordType outputType = initOutputType();
@Override
public String getName() {
- return "twitter_firehose_feed";
+ return "twitter_firehose";
}
@Override
@@ -96,14 +97,31 @@
}
List<String> storageNodes = ng.getNodeNames();
Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
- String ingestionLocation = null;
if (nodes.size() > storageNodes.size()) {
nodes.removeAll(storageNodes);
}
+
+ String iCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
+ int iCardinality = iCardinalityParam != null ? Integer.parseInt(iCardinalityParam) : 1;
+ String[] ingestionLocations = new String[iCardinality];
String[] nodesArray = nodes.toArray(new String[] {});
- Random r = new Random();
- ingestionLocation = nodesArray[r.nextInt(nodes.size())];
- return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
+ if (iCardinality > nodes.size()) {
+ for (int i = 0; i < nodesArray.length; i++) {
+ ingestionLocations[i] = nodesArray[i];
+ }
+
+ for (int j = nodesArray.length, k = 0; j < iCardinality; j++, k++) {
+ ingestionLocations[j] = storageNodes.get(k);
+ }
+ } else {
+ Random r = new Random();
+ int ingestLocIndex = r.nextInt(nodes.size());
+ ingestionLocations[0] = nodesArray[ingestLocIndex];
+ for (int i = 1; i < iCardinality; i++) {
+ ingestionLocations[i] = nodesArray[(ingestLocIndex + i) % nodesArray.length];
+ }
+ }
+ return new AlgebricksAbsolutePartitionConstraint(ingestionLocations);
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 745abb3..5e380ff 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -121,7 +121,8 @@
// indicates an absence of logs any further.
}
- if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+ //if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+ if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
return next(currentLogLocator); //should read from memory if there is any further log
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index e67f0164..b8d5312 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -591,7 +591,7 @@
public boolean isMemoryRead(long currentLSN) {
long flushLSN = lastFlushedLSN.get();
- if ((flushLSN + 1) % logPageSize == 0) {
+ if ((flushLSN + 1) == currentLSN) {
return false;
}
long logPageBeginOffset = flushLSN - (flushLSN % logPageSize);