Added asterix project
git-svn-id: https://asterixdb.googlecode.com/svn/trunk/asterix@12 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/assembly/binary-assembly.xml b/asterix-app/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..68d424a
--- /dev/null
+++ b/asterix-app/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/ADMCursor.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/ADMCursor.java
new file mode 100644
index 0000000..cad8760
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/ADMCursor.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+import edu.uci.ics.asterix.om.base.ABinary;
+import edu.uci.ics.asterix.om.base.ABitArray;
+import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.ACircle;
+import edu.uci.ics.asterix.om.base.ADate;
+import edu.uci.ics.asterix.om.base.ADateTime;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.ADuration;
+import edu.uci.ics.asterix.om.base.AFloat;
+import edu.uci.ics.asterix.om.base.AInt16;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AInt8;
+import edu.uci.ics.asterix.om.base.ALine;
+import edu.uci.ics.asterix.om.base.APoint;
+import edu.uci.ics.asterix.om.base.APoint3D;
+import edu.uci.ics.asterix.om.base.APolygon;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.ARectangle;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.ATime;
+import edu.uci.ics.asterix.om.base.IACollection;
+import edu.uci.ics.asterix.om.base.IACursor;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AbstractCollectionType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+/**
+ * This class is the implementation of IADMCursor. This class supports iterating
+ * over all objects in ASTERIX. All ASTERIX objects can be iterated over and
+ * returned via the associated get<Type>() call.
+ *
+ * @author zheilbron
+ */
+public class ADMCursor implements IADMCursor {
+ protected IAObject currentObject;
+ protected IACursor collectionCursor;
+ private boolean readOnce;
+
+ public ADMCursor(IAObject currentObject) {
+ setCurrentObject(currentObject);
+ }
+
+ public boolean next() throws AQLJException {
+ if (collectionCursor != null) {
+ boolean next = collectionCursor.next();
+ if (next) {
+ currentObject = collectionCursor.get();
+ }
+ return next;
+ } else if (currentObject == null) {
+ return false;
+ } else {
+ if (!readOnce) {
+ readOnce = true;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void position(IADMCursor c) throws AQLJException {
+ ((ADMCursor) c).setCurrentObject(currentObject);
+ }
+
+ @Override
+ public void position(IADMCursor c, String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ ((ADMCursor) c).setCurrentObject(o);
+ }
+
+ private IAObject getObjectByField(String field) throws AQLJException {
+ ATypeTag tag = currentObject.getType().getTypeTag();
+ if (tag != ATypeTag.RECORD) {
+ throw new AQLJException("object of type " + tag + " has no fields");
+ }
+ ARecord curRecord = (ARecord) currentObject;
+ ARecordType t = curRecord.getType();
+ int idx = t.findFieldPosition(field);
+ if (idx == -1) {
+ return null;
+ }
+ IAObject o = curRecord.getValueByPos(idx);
+ return o;
+ }
+
+ public void setCurrentObject(IAObject o) {
+ readOnce = false;
+ currentObject = o;
+ if (currentObject != null) {
+ if (currentObject.getType() instanceof AbstractCollectionType) {
+ collectionCursor = ((IACollection) currentObject).getCursor();
+ }
+ }
+ }
+
+ private void checkTypeTag(IAObject o, ATypeTag expectedTag) throws AQLJException {
+ ATypeTag actualTag;
+ actualTag = o.getType().getTypeTag();
+
+ if (actualTag != expectedTag) {
+ throw new AQLJException("cannot get " + expectedTag + " when type is " + actualTag);
+ }
+ }
+
+ @Override
+ public ABinary getBinary() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.BINARY);
+ return ((ABinary) currentObject);
+ }
+
+ @Override
+ public ABinary getBinary(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.BINARY);
+ return (ABinary) o;
+ }
+
+ @Override
+ public ABitArray getBitArray() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.BITARRAY);
+ return ((ABitArray) currentObject);
+ }
+
+ @Override
+ public ABitArray getBitArray(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.BITARRAY);
+ return (ABitArray) o;
+ }
+
+ @Override
+ public ABoolean getBoolean() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.BOOLEAN);
+ return ((ABoolean) currentObject);
+ }
+
+ @Override
+ public ABoolean getBoolean(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.BOOLEAN);
+ return (ABoolean) o;
+ }
+
+ @Override
+ public ACircle getCircle() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.CIRCLE);
+ return ((ACircle) currentObject);
+ }
+
+ @Override
+ public ACircle getCircle(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.CIRCLE);
+ return (ACircle) o;
+ }
+
+ @Override
+ public ADate getDate() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.DATE);
+ return ((ADate) currentObject);
+ }
+
+ @Override
+ public ADate getDate(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.DATE);
+ return (ADate) o;
+ }
+
+ @Override
+ public ADateTime getDateTime() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.DATETIME);
+ return ((ADateTime) currentObject);
+ }
+
+ @Override
+ public ADateTime getDateTime(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.DATETIME);
+ return (ADateTime) o;
+ }
+
+ @Override
+ public ADouble getDouble() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.DOUBLE);
+ return ((ADouble) currentObject);
+ }
+
+ @Override
+ public ADouble getDouble(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.DOUBLE);
+ return (ADouble) o;
+ }
+
+ @Override
+ public ADuration getDuration() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.DURATION);
+ return ((ADuration) currentObject);
+ }
+
+ @Override
+ public ADuration getDuration(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.DURATION);
+ return (ADuration) o;
+ }
+
+ @Override
+ public AFloat getFloat() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.FLOAT);
+ return ((AFloat) currentObject);
+ }
+
+ @Override
+ public AFloat getFloat(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.FLOAT);
+ return (AFloat) o;
+ }
+
+ @Override
+ public AInt8 getInt8() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.INT8);
+ return ((AInt8) currentObject);
+ }
+
+ @Override
+ public AInt8 getInt8(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.INT8);
+ return (AInt8) o;
+ }
+
+ @Override
+ public AInt16 getInt16() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.INT16);
+ return ((AInt16) currentObject);
+ }
+
+ @Override
+ public AInt16 getInt16(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.INT16);
+ return (AInt16) o;
+ }
+
+ @Override
+ public AInt32 getInt32() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.INT32);
+ return ((AInt32) currentObject);
+ }
+
+ @Override
+ public AInt32 getInt32(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.INT32);
+ return (AInt32) o;
+ }
+
+ @Override
+ public AInt64 getInt64() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.INT64);
+ return ((AInt64) currentObject);
+ }
+
+ @Override
+ public AInt64 getInt64(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.INT64);
+ return (AInt64) o;
+ }
+
+ @Override
+ public ALine getLine() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.LINE);
+ return ((ALine) currentObject);
+ }
+
+ @Override
+ public ALine getLine(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.LINE);
+ return (ALine) o;
+ }
+
+ @Override
+ public APoint getPoint() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.POINT);
+ return ((APoint) currentObject);
+ }
+
+ @Override
+ public APoint getPoint(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.POINT);
+ return (APoint) o;
+ }
+
+ @Override
+ public APoint3D getPoint3D() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.POINT3D);
+ return ((APoint3D) currentObject);
+ }
+
+ @Override
+ public APoint3D getPoint3D(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.POINT3D);
+ return (APoint3D) o;
+ }
+
+ @Override
+ public APolygon getPolygon() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.POLYGON);
+ return ((APolygon) currentObject);
+ }
+
+ @Override
+ public APolygon getPolygon(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.POLYGON);
+ return (APolygon) o;
+ }
+
+ @Override
+ public ARectangle getRectangle() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.RECTANGLE);
+ return ((ARectangle) currentObject);
+ }
+
+ @Override
+ public ARectangle getRectangle(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.RECTANGLE);
+ return (ARectangle) o;
+ }
+
+ @Override
+ public AString getString() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.STRING);
+ return ((AString) currentObject);
+ }
+
+ @Override
+ public AString getString(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.STRING);
+ return (AString) o;
+ }
+
+ @Override
+ public ATime getTime() throws AQLJException {
+ checkTypeTag(currentObject, ATypeTag.TIME);
+ return ((ATime) currentObject);
+ }
+
+ @Override
+ public ATime getTime(String field) throws AQLJException {
+ IAObject o = getObjectByField(field);
+ checkTypeTag(o, ATypeTag.TIME);
+ return (ATime) o;
+ }
+
+ public IAType getType() {
+ if (currentObject != null) {
+ return currentObject.getType();
+ }
+ return null;
+ }
+
+ @Override
+ public IAObject get() {
+ return currentObject;
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJClientDriver.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJClientDriver.java
new file mode 100644
index 0000000..a1a077b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJClientDriver.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+
+/**
+ * This class encapsulates the mechanism for creating a connection to an ASTERIX
+ * server.
+ *
+ * @author zheilbron
+ */
+public class AQLJClientDriver {
+ /**
+ * Get a connection to the ASTERIX server.
+ *
+ * @param host
+ * the ip or hostname of the ASTERIX server
+ * @param port
+ * the port of the ASTERIX server (default: 14600)
+ * @param dataverse
+ * the name of the dataverse to use for any AQL statements
+ * @return an IAQLJConnection object representing the connection to ASTERIX
+ * @throws AQLJException
+ */
+ public static IAQLJConnection getConnection(String host, int port, String dataverse) throws AQLJException {
+ return new AQLJConnection(host, port, dataverse);
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJConnection.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJConnection.java
new file mode 100644
index 0000000..c597450
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJConnection.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+import edu.uci.ics.asterix.api.aqlj.common.AQLJProtocol;
+import edu.uci.ics.asterix.api.aqlj.common.AQLJStream;
+
+/**
+ * This class is the implementation of IAQLJConnection and is the means for
+ * communication between a client and an ASTERIX server. The messages passed
+ * through this connection conform to the AQLJ protocol.
+ *
+ * @author zheilbron
+ */
+public class AQLJConnection implements IAQLJConnection {
+ private final String dataverse;
+ private final AQLJStream aqljStream;
+
+ public AQLJConnection(String host, int port, String dataverse) throws AQLJException {
+ this.dataverse = dataverse;
+
+ try {
+ aqljStream = new AQLJStream(host, port);
+ } catch (IOException e) {
+ throw new AQLJException("Could not connect to " + host + ":" + port);
+ }
+
+ startup();
+ }
+
+ private void startup() throws AQLJException {
+ sendStartupMessage(dataverse);
+ getStartupResponse();
+ }
+
+ private void sendStartupMessage(String dataverse) throws AQLJException {
+ try {
+ byte[] dvBytes = dataverse.getBytes("UTF-8");
+ // 4 for the message length, 1 for the message type, 2 for the
+ // string length
+ aqljStream.sendUnsignedInt32(4 + 1 + 2 + dvBytes.length);
+ aqljStream.sendChar(AQLJProtocol.STARTUP_MESSAGE);
+ aqljStream.sendInt16(dvBytes.length);
+ aqljStream.send(dvBytes);
+ aqljStream.flush();
+ } catch (IOException e) {
+ throw new AQLJException(e);
+ }
+ }
+
+ private void getStartupResponse() throws AQLJException {
+ try {
+ aqljStream.receiveUnsignedInt32();
+ int messageType = aqljStream.receiveChar();
+ switch (messageType) {
+ case AQLJProtocol.READY_MESSAGE:
+ break;
+ case AQLJProtocol.ERROR_MESSAGE:
+ String err = aqljStream.receiveString();
+ throw new AQLJException(err);
+ default:
+ throw new AQLJException("Error: unable to parse message from server");
+ }
+ } catch (IOException e) {
+ throw new AQLJException(e);
+ }
+ }
+
+ @Override
+ public IAQLJResult execute(String stmt) throws AQLJException {
+ sendExecute(stmt);
+ return fetchResults();
+ }
+
+ private AQLJResult fetchResults() throws AQLJException {
+ long len;
+ int messageType;
+
+ ResultBuffer rb = null;
+ while (true) {
+ try {
+ len = aqljStream.receiveUnsignedInt32();
+ messageType = aqljStream.receiveChar();
+ switch (messageType) {
+ case AQLJProtocol.DATA_MESSAGE:
+ // DataRecord
+ if (rb == null) {
+ rb = new ResultBuffer();
+ }
+ rb.appendMessage(aqljStream, (int) (len - 5));
+ break;
+ case AQLJProtocol.EXECUTE_COMPLETE_MESSAGE:
+ // ExecuteComplete
+ return new AQLJResult(rb);
+ case AQLJProtocol.ERROR_MESSAGE:
+ // Error
+ throw new AQLJException(aqljStream.receiveString());
+ default:
+ throw new AQLJException("Error: received unknown message type from server");
+ }
+ } catch (IOException e) {
+ throw new AQLJException(e);
+ }
+ }
+
+ }
+
+ private void sendExecute(String stmt) throws AQLJException {
+ try {
+ byte[] stmtBytes = stmt.getBytes("UTF-8");
+ // 4 for the message length, 1 for the message type, 2 for the
+ // string length
+ aqljStream.sendUnsignedInt32(4 + 1 + 2 + stmtBytes.length);
+ aqljStream.sendChar(AQLJProtocol.EXECUTE_MESSAGE);
+ aqljStream.sendInt16(stmtBytes.length);
+ aqljStream.send(stmtBytes);
+ aqljStream.flush();
+ } catch (IOException e) {
+ throw new AQLJException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ aqljStream.close();
+ }
+
+ @Override
+ public IADMCursor createADMCursor() {
+ return new ADMCursor(null);
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJResult.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJResult.java
new file mode 100644
index 0000000..63114ce
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/AQLJResult.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+
+/**
+ * This class is special type of ADMCursor in that it has a result buffer
+ * associated with it. It can be thought of as the "base" cursor for some ADM
+ * results.
+ *
+ * @author zheilbron
+ */
+public class AQLJResult extends ADMCursor implements IAQLJResult {
+ private final ResultBuffer resultBuffer;
+
+ public AQLJResult(ResultBuffer buffer) {
+ super(null);
+ this.resultBuffer = buffer;
+ }
+
+ @Override
+ public boolean next() throws AQLJException {
+ currentObject = resultBuffer.get();
+ if (currentObject == null) {
+ return false;
+ }
+ return true;
+ }
+
+ public void close() throws IOException {
+ resultBuffer.close();
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IADMCursor.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IADMCursor.java
new file mode 100644
index 0000000..a7500c9
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IADMCursor.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+import edu.uci.ics.asterix.om.base.ABinary;
+import edu.uci.ics.asterix.om.base.ABitArray;
+import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.ACircle;
+import edu.uci.ics.asterix.om.base.ADate;
+import edu.uci.ics.asterix.om.base.ADateTime;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.ADuration;
+import edu.uci.ics.asterix.om.base.AFloat;
+import edu.uci.ics.asterix.om.base.AInt16;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AInt8;
+import edu.uci.ics.asterix.om.base.ALine;
+import edu.uci.ics.asterix.om.base.APoint;
+import edu.uci.ics.asterix.om.base.APoint3D;
+import edu.uci.ics.asterix.om.base.APolygon;
+import edu.uci.ics.asterix.om.base.ARectangle;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.ATime;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.IAType;
+
+/**
+ * The mechanism by which results are iterated over. Results from ASTERIX may
+ * come in the form of a set of objects which may either be primitives (e.g.
+ * int, string, ...), collections (e.g. ordered lists, unordered lists, ...),
+ * records, or some combination thereof.
+ *
+ * @author zheilbron
+ */
+public interface IADMCursor {
+ public ABinary getBinary() throws AQLJException;
+
+ public ABinary getBinary(String field) throws AQLJException;
+
+ public ABitArray getBitArray() throws AQLJException;
+
+ public ABitArray getBitArray(String field) throws AQLJException;
+
+ public ABoolean getBoolean() throws AQLJException;
+
+ public ABoolean getBoolean(String field) throws AQLJException;
+
+ public ACircle getCircle() throws AQLJException;
+
+ public ACircle getCircle(String field) throws AQLJException;
+
+ public ADate getDate() throws AQLJException;
+
+ public ADate getDate(String field) throws AQLJException;
+
+ public ADateTime getDateTime() throws AQLJException;
+
+ public ADateTime getDateTime(String field) throws AQLJException;
+
+ public ADouble getDouble() throws AQLJException;
+
+ public ADouble getDouble(String field) throws AQLJException;
+
+ public ADuration getDuration() throws AQLJException;
+
+ public ADuration getDuration(String field) throws AQLJException;
+
+ public AFloat getFloat() throws AQLJException;
+
+ public AFloat getFloat(String field) throws AQLJException;
+
+ public AInt8 getInt8() throws AQLJException;
+
+ public AInt8 getInt8(String field) throws AQLJException;
+
+ public AInt16 getInt16() throws AQLJException;
+
+ public AInt16 getInt16(String field) throws AQLJException;
+
+ public AInt32 getInt32() throws AQLJException;
+
+ public AInt32 getInt32(String field) throws AQLJException;
+
+ public AInt64 getInt64() throws AQLJException;
+
+ public AInt64 getInt64(String field) throws AQLJException;
+
+ public ALine getLine() throws AQLJException;
+
+ public ALine getLine(String field) throws AQLJException;
+
+ public APoint getPoint() throws AQLJException;
+
+ public APoint getPoint(String field) throws AQLJException;
+
+ public APoint3D getPoint3D() throws AQLJException;
+
+ public APoint3D getPoint3D(String field) throws AQLJException;
+
+ public APolygon getPolygon() throws AQLJException;
+
+ public APolygon getPolygon(String field) throws AQLJException;
+
+ public ARectangle getRectangle() throws AQLJException;
+
+ public ARectangle getRectangle(String field) throws AQLJException;
+
+ public AString getString(String field) throws AQLJException;
+
+ public AString getString() throws AQLJException;
+
+ public ATime getTime() throws AQLJException;
+
+ public ATime getTime(String field) throws AQLJException;
+
+ /**
+ * Advances the cursor to the next object
+ *
+ * @return true if the cursor points to a an object
+ * @throws AQLJException
+ */
+ public boolean next() throws AQLJException;
+
+ /**
+ * Positions the cursor c on the object pointed to by this
+ *
+ * @param c
+ * the cursor to position
+ * @throws AQLJException
+ */
+ public void position(IADMCursor c) throws AQLJException;
+
+ /**
+ * Positions the cursor c on the object associated with the given field
+ *
+ * @param c
+ * the cursor to position
+ * @param field
+ * the field name
+ * @throws AQLJException
+ */
+ public void position(IADMCursor c, String field) throws AQLJException;
+
+ /**
+ * Returns the type of the current object being pointed at, which may be
+ * null.
+ *
+ * @return the type of the current object
+ */
+ public IAType getType();
+
+ /**
+ * Returns the current object being pointed at, which may be null.
+ *
+ * @return the current object
+ */
+ public IAObject get();
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IAQLJConnection.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IAQLJConnection.java
new file mode 100644
index 0000000..8fdf59d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IAQLJConnection.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+
+/**
+ * The connection (session) that serves as the context for communicating with
+ * ASTERIX.
+ *
+ * @author zheilbron
+ */
+public interface IAQLJConnection {
+ /**
+ * Execute an AQL statement that returns an IAQLJResult. The IAQLJResult
+ * will contain all associated results of the AQL statement.
+ *
+ * @param stmt
+ * the AQL statement
+ * @return the results of the AQL statement as an IAQLJResult
+ * @throws AQLJException
+ */
+ public IAQLJResult execute(String stmt) throws AQLJException;
+
+ /**
+ * Create a cursor to iterate over results
+ *
+ * @return an unpositioned cursor
+ */
+ public IADMCursor createADMCursor();
+
+ /**
+ * Close the connection with the server.
+ */
+ public void close() throws IOException;
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IAQLJResult.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IAQLJResult.java
new file mode 100644
index 0000000..b28b3c6
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/IAQLJResult.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import java.io.IOException;
+
+/**
+ * The results associated with an AQL statement.
+ *
+ * @author zheilbron
+ */
+public interface IAQLJResult extends IADMCursor {
+ /**
+ * Close the cursor and discard any associated results.
+ * It's important to ensure that this method is called in order to free up
+ * the associated result buffer.
+ */
+ public void close() throws IOException;
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/ResultBuffer.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/ResultBuffer.java
new file mode 100644
index 0000000..9fddad5
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/client/ResultBuffer.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.client;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+import edu.uci.ics.asterix.api.aqlj.common.AQLJStream;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This class supports the buffering of results that are received from the
+ * server. The results are buffered completely to a file on disk. The results
+ * that are sent back should contain a serde in order to read the results back
+ * in. To see the expected format refer to {@link edu.uci.ics.algebricks.runtime.hyracks.writers.SerializedDataWriterFactory} .
+ *
+ * @author zheilbron
+ */
+public class ResultBuffer {
+ private static final Logger LOGGER = Logger.getLogger(ResultBuffer.class.getName());
+
+ private static final int BUF_SIZE = 8192;
+
+ private final byte[] buffer;
+ private final File tmpFile;
+ private final FileOutputStream fos;
+ private final FileInputStream fis;
+ private final DataInputStream dis;
+
+ private ObjectInputStream ois;
+ private ISerializerDeserializer serde;
+
+ public ResultBuffer() throws IOException {
+ buffer = new byte[BUF_SIZE];
+ tmpFile = File.createTempFile("aqlj", null, new File(System.getProperty("java.io.tmpdir")));
+ fos = new FileOutputStream(tmpFile);
+ fis = new FileInputStream(tmpFile);
+ dis = new DataInputStream(fis);
+ serde = null;
+ }
+
+ private RecordDescriptor getRecordDescriptor() throws AQLJException {
+ RecordDescriptor rd;
+ try {
+ ois = new ObjectInputStream(fis);
+ } catch (IOException e) {
+ throw new AQLJException(e);
+ }
+ try {
+ rd = (RecordDescriptor) ois.readObject();
+ } catch (IOException e) {
+ throw new AQLJException(e);
+ } catch (ClassNotFoundException e) {
+ throw new AQLJException(e);
+ }
+ return rd;
+ }
+
+ public IAObject get() throws AQLJException {
+ Object o;
+
+ if (serde == null) {
+ serde = getRecordDescriptor().getFields()[0];
+ }
+
+ try {
+ o = serde.deserialize(dis);
+ } catch (HyracksDataException e) {
+ // this is expected behavior... we know when we've reached the end
+ // of the
+ // results when a EOFException (masked by the HyracksDataException)
+ // is thrown
+ o = null;
+ }
+
+ return (IAObject) o;
+ }
+
+ public void appendMessage(AQLJStream aqljStream, long len) throws IOException {
+ long pos = 0;
+ long read = 0;
+ long remaining = 0;
+
+ while (pos < len) {
+ remaining = len - pos;
+ read = remaining > BUF_SIZE ? BUF_SIZE : remaining;
+ aqljStream.receive(buffer, 0, (int) read);
+ pos += read;
+ fos.write(buffer, 0, (int) read);
+ }
+ }
+
+ public void close() throws IOException {
+ // remove the file!
+ if (tmpFile.exists()) {
+ tmpFile.delete();
+ }
+ fos.close();
+ fis.close();
+ dis.close();
+ if (ois != null) {
+ ois.close();
+ }
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJException.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJException.java
new file mode 100644
index 0000000..7c19a56
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJException.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.common;
+
+/**
+ * This is the base (and currently the only) exception class for AQLJ.
+ *
+ * @author zheilbron
+ */
+public class AQLJException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public AQLJException(String message) {
+ super(message);
+ }
+
+ public AQLJException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AQLJException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJProtocol.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJProtocol.java
new file mode 100644
index 0000000..83ef0e5
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJProtocol.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.common;
+
+/**
+ * This class provides constants for message types in the AQLJ protocol.
+ *
+ * @author zheilbron
+ */
+public abstract class AQLJProtocol {
+ public static final char STARTUP_MESSAGE = 'S';
+ public static final char EXECUTE_MESSAGE = 'X';
+ public static final char READY_MESSAGE = 'R';
+ public static final char ERROR_MESSAGE = 'E';
+ public static final char EXECUTE_COMPLETE_MESSAGE = 'C';
+ public static final char DATA_MESSAGE = 'D';
+ public static final char GET_RESULTS_MESSAGE = 'G';
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJStream.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJStream.java
new file mode 100644
index 0000000..c595284
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/common/AQLJStream.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.common;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * This class provides a clean mechanism for sending and receiving the data
+ * types involved in the AQLJ protocol.
+ *
+ * @author zheilbron
+ */
+public class AQLJStream {
+ private static final int BUF_SIZE = 8192;
+
+ private final String host;
+ private final int port;
+ private final Socket connection;
+ private final BufferedInputStream aqljInput;
+ private final BufferedOutputStream aqljOutput;
+
+ private final byte[] int16Buf;
+ private final byte[] int32Buf;
+
+ public AQLJStream(String host, int port) throws IOException {
+ this.host = host;
+ this.port = port;
+
+ connection = new Socket(host, port);
+
+ aqljInput = new BufferedInputStream(connection.getInputStream(), BUF_SIZE);
+ aqljOutput = new BufferedOutputStream(connection.getOutputStream(), BUF_SIZE);
+
+ int16Buf = new byte[2];
+ int32Buf = new byte[4];
+ }
+
+ public AQLJStream(Socket sock) throws IOException {
+ this.host = null;
+ this.port = 0;
+
+ this.connection = sock;
+ aqljInput = new BufferedInputStream(connection.getInputStream(), BUF_SIZE);
+ aqljOutput = new BufferedOutputStream(connection.getOutputStream(), BUF_SIZE);
+
+ int16Buf = new byte[2];
+ int32Buf = new byte[4];
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public Socket getSocket() {
+ return connection;
+ }
+
+ public void receive(byte[] buf, int off, int len) throws IOException {
+ int read;
+ int count = 0;
+ while (count < len) {
+ read = aqljInput.read(buf, off + count, len - count);
+ if (read < 0) {
+ throw new EOFException();
+ }
+ count += read;
+ }
+ }
+
+ public byte[] receive(int len) throws IOException {
+ byte[] result = new byte[len];
+ receive(result, 0, len);
+ return result;
+ }
+
+ public int receiveInt16() throws IOException {
+ if (aqljInput.read(int16Buf) != 2) {
+ throw new EOFException();
+ }
+ return (int16Buf[0] & 0xff) << 8 | (int16Buf[1] & 0xff);
+ }
+
+ public long receiveUnsignedInt32() throws IOException {
+ if (aqljInput.read(int32Buf) != 4) {
+ throw new EOFException();
+ }
+ return ((int32Buf[0] & 0xff) << 24 | (int32Buf[1] & 0xff) << 16 | (int32Buf[2] & 0xff) << 8 | (int32Buf[3] & 0xff)) & 0x00000000ffffffffl;
+ }
+
+ public int receiveChar() throws IOException {
+ int c = aqljInput.read();
+ if (c < 0) {
+ throw new EOFException();
+ }
+ return c;
+ }
+
+ public String receiveString() throws IOException {
+ int strlen = receiveInt16();
+ return new String(receive(strlen), "UTF8");
+ }
+
+ public void send(byte[] buf) throws IOException {
+ aqljOutput.write(buf);
+ }
+
+ public void send(byte[] buf, int off, int len) throws IOException {
+ aqljOutput.write(buf, off, len);
+ }
+
+ public void sendInt16(int val) throws IOException {
+ int16Buf[0] = (byte) (val >>> 8);
+ int16Buf[1] = (byte) (val);
+ aqljOutput.write(int16Buf);
+ }
+
+ public void sendUnsignedInt32(long val) throws IOException {
+ int32Buf[0] = (byte) (val >>> 24);
+ int32Buf[1] = (byte) (val >>> 16);
+ int32Buf[2] = (byte) (val >>> 8);
+ int32Buf[3] = (byte) (val);
+ aqljOutput.write(int32Buf);
+ }
+
+ public void sendChar(int c) throws IOException {
+ aqljOutput.write(c);
+ }
+
+ public void sendString(byte[] strBytes) throws IOException {
+ sendInt16(strBytes.length);
+ send(strBytes);
+ }
+
+ public void sendString(String str) throws IOException {
+ byte[] strBytes = str.getBytes("UTF8");
+ sendInt16(strBytes.length);
+ send(strBytes);
+ }
+
+ public void flush() throws IOException {
+ aqljOutput.flush();
+ }
+
+ public void close() throws IOException {
+ aqljInput.close();
+ aqljOutput.close();
+ connection.close();
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/APIClientThread.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/APIClientThread.java
new file mode 100644
index 0000000..50ed4d1
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/APIClientThread.java
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.json.JSONException;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJException;
+import edu.uci.ics.asterix.api.aqlj.common.AQLJProtocol;
+import edu.uci.ics.asterix.api.aqlj.common.AQLJStream;
+import edu.uci.ics.asterix.api.common.APIFramework;
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
+import edu.uci.ics.asterix.api.common.AsterixHyracksIntegrationUtil;
+import edu.uci.ics.asterix.api.common.Job;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.aql.expression.Query;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.parser.ParseException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.hyracks.bootstrap.AsterixNodeState;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
+import edu.uci.ics.asterix.metadata.bootstrap.AsterixProperties;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+/**
+ * This class is the client handler for the APIServer. The AQLJ protocol is used
+ * for communicating with the client. The client, for example, may send a
+ * message to execute an AQL statement. It is up to this class to process that
+ * AQL statement and pass back the results, if any, to the client.
+ *
+ * @author zheilbron
+ */
+public class APIClientThread extends Thread {
+ private static final Logger LOGGER = Logger.getLogger(APIClientThread.class.getName());
+
+ private static final int RESULT_BUF_SIZE = 8192;
+
+ private final ICCApplicationContext appContext;
+ private final AQLJStream clientStream;
+ private final String outputFilePath;
+ private final String outputNodeName;
+ private final String outputNodeIP;
+ private final String binaryOutputClause;
+
+ private AQLJStream nodeDataServerStream;
+ private int nodeDataServerPort;
+ private String dataverse;
+
+ public APIClientThread(Socket clientSocket, ICCApplicationContext appCtx) throws IOException {
+ clientStream = new AQLJStream(clientSocket);
+ this.appContext = appCtx;
+
+ // get the name of the first node controller that we find
+ // all query results will be written to this node
+ Map<String, Set<String>> nodeNameMap = new HashMap<String, Set<String>>();
+ try {
+ this.appContext.getCCContext().getIPAddressNodeMap(nodeNameMap);
+ } catch (Exception e) {
+ throw new IOException(" unable to obtain IP address node map", e);
+ }
+ outputNodeIP = (String) nodeNameMap.keySet().toArray()[0];
+ outputNodeName = (String) nodeNameMap.get(outputNodeIP).toArray()[0];
+
+ // get the port of the node data server that is running on the first nc
+ IAsterixStateProxy proxy = (IAsterixStateProxy) appCtx.getDistributedState();
+ nodeDataServerPort = ((AsterixNodeState) proxy.getAsterixNodeState(outputNodeName)).getAPINodeDataServerPort();
+ nodeDataServerStream = null;
+
+ // write the data into the output stores directory of the nc
+ // if output stores are unavailable (could they ever be?), then write to
+ // tmpdir which can be overridden
+ // Also, use milliseconds in path name of output file to differentiate
+ // queries
+ Map<String, String[]> storesMap = AsterixProperties.INSTANCE.getStores();
+ String[] outputStores = storesMap.get(outputNodeName);
+ if (outputStores.length > 0) {
+ outputFilePath = outputStores[0] + System.currentTimeMillis() + ".adm";
+ } else {
+ outputFilePath = System.getProperty("java.io.tmpdir") + File.pathSeparator + System.currentTimeMillis()
+ + ".adm";
+ }
+
+ // the "write output..." clause is inserted into incoming AQL statements
+ binaryOutputClause = "write output to "
+ + outputNodeName
+ + ":\""
+ + outputFilePath
+ + "\" using \"edu.uci.ics.hyracks.algebricks.core.algebra.runtime.writers.SerializedDataWriterFactory\";";
+
+ }
+
+ private void startup() throws IOException {
+ int messageType;
+
+ clientStream.receiveUnsignedInt32();
+ messageType = clientStream.receiveChar();
+ dataverse = clientStream.receiveString();
+ if (messageType == AQLJProtocol.STARTUP_MESSAGE) {
+ // send Ready
+ sendReady();
+ } else {
+ // send Error
+ LOGGER.warning("Error: received message other than Startup. Exiting.");
+ String err = "startup failed: no Startup message received";
+ sendError(err);
+ }
+ }
+
+ public void run() {
+ String outputPath;
+ int messageType;
+
+ try {
+ // startup phase
+ startup();
+
+ // normal execution phase
+ while (true) {
+ // check if we should close
+ if (Thread.interrupted()) {
+ close();
+ return;
+ }
+
+ clientStream.receiveUnsignedInt32();
+ messageType = clientStream.receiveChar();
+ switch (messageType) {
+ case AQLJProtocol.EXECUTE_MESSAGE:
+ // Execute
+ String query = clientStream.receiveString();
+ String fullQuery = "use dataverse " + dataverse + ";\n" + binaryOutputClause + '\n' + query;
+
+ try {
+ outputPath = executeStatement(fullQuery);
+ } catch (AQLJException e) {
+ LOGGER.severe("Error occurred while executing query: " + fullQuery);
+ LOGGER.severe(e.getMessage());
+ sendError(e.getMessage());
+ break;
+ }
+
+ if (outputPath == null) {
+ // The query ran, but produced no results. This
+ // means cardinality of the
+ // result is 0 or "actions" were performed, where
+ // actions are things like create
+ // type, create dataset, etc.
+ sendExecuteComplete();
+ } else {
+ // otherwise, there are some results, so send them
+ // back to the client
+ if (sendResults(outputPath)) {
+ sendExecuteComplete();
+ } else {
+ String err = "Error: unable to retrieve results from " + outputNodeName;
+ LOGGER.severe(err);
+ sendError(err);
+ }
+ }
+ break;
+ default:
+ String err = "Error: received unknown message of type " + (char) messageType;
+ sendError(err);
+ LOGGER.severe(err);
+ close();
+ return;
+ }
+ }
+ } catch (IOException e) {
+ // the normal path that is taken when exiting
+ close();
+ return;
+ }
+ }
+
+ private void close() {
+ try {
+ if (nodeDataServerStream != null) {
+ nodeDataServerStream.close();
+ }
+ } catch (IOException e) {
+ LOGGER.severe("Error closing NodeData AQLJStream");
+ LOGGER.severe(e.getMessage());
+ }
+ try {
+ clientStream.close();
+ } catch (IOException e) {
+ LOGGER.severe("Error closing client AQLJStream");
+ LOGGER.severe(e.getMessage());
+ }
+ }
+
+ private String executeStatement(String stmt) throws IOException, AQLJException {
+ PrintWriter out = new PrintWriter(System.out);
+ AqlCompiledMetadataDeclarations metadata = null;
+ try {
+ AQLParser parser = new AQLParser(new StringReader(stmt));
+ Query q = (Query) parser.Statement();
+ SessionConfig pc = new SessionConfig(AsterixHyracksIntegrationUtil.DEFAULT_HYRACKS_CC_CLIENT_PORT, true,
+ false, false, false, false, false, false);
+ pc.setGenerateJobSpec(true);
+
+ MetadataManager.INSTANCE.init();
+ if (q != null) {
+ String dataverse = APIFramework.compileDdlStatements(q, out, pc, DisplayFormat.TEXT);
+ Job[] dmlJobs = APIFramework.compileDmlStatements(dataverse, q, out, pc, DisplayFormat.TEXT);
+ APIFramework.executeJobArray(dmlJobs, pc.getPort(), out, DisplayFormat.TEXT);
+ }
+
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> metadataAndSpec = APIFramework.compileQuery(
+ dataverse, q, parser.getVarCounter(), null, metadata, pc, out, DisplayFormat.TEXT, null);
+ JobSpecification spec = metadataAndSpec.second;
+ metadata = metadataAndSpec.first;
+ APIFramework.executeJobArray(new JobSpecification[] { spec },
+ AsterixHyracksIntegrationUtil.DEFAULT_HYRACKS_CC_CLIENT_PORT, out, DisplayFormat.TEXT);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new AQLJException(e);
+ } catch (AsterixException e) {
+ e.printStackTrace();
+ throw new AQLJException(e);
+ } catch (AlgebricksException e) {
+ e.printStackTrace();
+ throw new AQLJException(e);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ throw new AQLJException(e);
+ } catch (Exception e) {
+ e.printStackTrace();
+ sendError(e.getMessage());
+ }
+
+ if (metadata == null) {
+ return null;
+ }
+
+ return metadata.getOutputFile().getLocalFile().getFile().getAbsolutePath();
+ }
+
+ private boolean sendResults(String path) throws IOException {
+ int messageType;
+ long len;
+ int sent;
+ int toSend;
+ byte[] buf = new byte[RESULT_BUF_SIZE];
+
+ if (nodeDataServerStream == null) {
+ nodeDataServerStream = new AQLJStream(outputNodeIP, nodeDataServerPort);
+ }
+ sendGetResults(nodeDataServerStream);
+
+ // forward data packets from the nodedataservers through this server to
+ // the client
+ while (true) {
+ len = nodeDataServerStream.receiveUnsignedInt32();
+ messageType = nodeDataServerStream.receiveChar();
+ switch ((char) messageType) {
+ case AQLJProtocol.DATA_MESSAGE:
+ clientStream.sendUnsignedInt32(len);
+ clientStream.sendChar(AQLJProtocol.DATA_MESSAGE);
+ len -= 5;
+ sent = 0;
+ while (sent < len) {
+ len -= sent;
+ toSend = (len > buf.length) ? buf.length : (int) len;
+ nodeDataServerStream.receive(buf, 0, toSend);
+ clientStream.send(buf, 0, toSend);
+ sent += toSend;
+ }
+ clientStream.flush();
+ break;
+ case AQLJProtocol.EXECUTE_COMPLETE_MESSAGE:
+ nodeDataServerStream.close();
+ nodeDataServerStream = null;
+ return true;
+ default:
+ nodeDataServerStream.close();
+ nodeDataServerStream = null;
+ return false;
+ }
+ }
+ }
+
+ private void sendGetResults(AQLJStream s) throws IOException {
+ byte[] pathBytes = outputFilePath.getBytes("UTF-8");
+ // 4 for the message length, 1 for the message type, 2 for the string
+ // length
+ s.sendUnsignedInt32(4 + 1 + 2 + pathBytes.length);
+ s.sendChar(AQLJProtocol.GET_RESULTS_MESSAGE);
+ s.sendString(outputFilePath);
+ s.flush();
+ }
+
+ private void sendReady() throws IOException {
+ // 4 for the message length and 1 for the message type (4 + 1 = 5)
+ clientStream.sendUnsignedInt32(5);
+ clientStream.sendChar(AQLJProtocol.READY_MESSAGE);
+ clientStream.flush();
+ }
+
+ private void sendError(String msg) throws IOException {
+ byte[] msgBytes = msg.getBytes("UTF-8");
+ // 4 for the message length, 1 for the message type, 2 for the string
+ // length
+ clientStream.sendUnsignedInt32(4 + 1 + 2 + msgBytes.length);
+ clientStream.sendChar(AQLJProtocol.ERROR_MESSAGE);
+ clientStream.sendInt16(msgBytes.length);
+ clientStream.send(msgBytes);
+ clientStream.flush();
+ }
+
+ private void sendExecuteComplete() throws IOException {
+ // 4 for the message length and 1 for the message type (4 + 1 = 5)
+ clientStream.sendUnsignedInt32(5);
+ clientStream.sendChar(AQLJProtocol.EXECUTE_COMPLETE_MESSAGE);
+ clientStream.flush();
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/APIClientThreadFactory.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/APIClientThreadFactory.java
new file mode 100644
index 0000000..36f049c
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/APIClientThreadFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.server;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+
+/**
+ * This class is a factory for client handler threads of type {@link APIClientThread} and is used in conjunction with {@link ThreadedServer}.
+ *
+ * @author zheilbron
+ */
+public class APIClientThreadFactory implements IClientThreadFactory {
+ private final ICCApplicationContext appContext;
+
+ public APIClientThreadFactory(ICCApplicationContext appContext) {
+ this.appContext = appContext;
+ }
+
+ @Override
+ public Thread createThread(Socket socket) throws IOException {
+ return new APIClientThread(socket, appContext);
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/IClientThreadFactory.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/IClientThreadFactory.java
new file mode 100644
index 0000000..bca7f4d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/IClientThreadFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.server;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Implementing this interface allows a class such as {@link ThreadedServer} to
+ * spawn a particular type of thread to handle some client connection.
+ *
+ * @author zheilbron
+ */
+public interface IClientThreadFactory {
+ public Thread createThread(Socket socket) throws IOException;
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/NodeDataClientThread.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/NodeDataClientThread.java
new file mode 100644
index 0000000..0246fd9
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/NodeDataClientThread.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.aqlj.common.AQLJProtocol;
+import edu.uci.ics.asterix.api.aqlj.common.AQLJStream;
+
+/**
+ * This class handles data requests from the APIServer. When a query is executed
+ * through the API, the output is written to the local disk of some NC. The
+ * APIServer will contact that NC and ask for the results of the query to be
+ * sent. This class handles such communication between the NC and APIServer.
+ *
+ * @author zheilbron
+ */
+public class NodeDataClientThread extends Thread {
+ private static final Logger LOGGER = Logger.getLogger(NodeDataClientThread.class.getName());
+
+ private static final int RESULT_BUFFER_SIZE = 8192;
+
+ private final AQLJStream aqljStream;
+
+ public NodeDataClientThread(Socket clientSocket) throws IOException {
+ aqljStream = new AQLJStream(clientSocket);
+ }
+
+ public void run() {
+ try {
+ getFile();
+ } catch (IOException e) {
+ LOGGER.severe("I/O error occurred over AQLJStream (socket)");
+ LOGGER.severe(e.getMessage());
+ } finally {
+ close();
+ }
+ }
+
+ private void getFile() throws IOException {
+ aqljStream.receiveUnsignedInt32();
+ int type = aqljStream.receiveChar();
+ if ((char) type != AQLJProtocol.GET_RESULTS_MESSAGE) {
+ return;
+ }
+
+ String path = aqljStream.receiveString();
+ File outputFile = new File(path);
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(outputFile);
+ } catch (FileNotFoundException e) {
+ LOGGER.warning("Error: requested file not found: " + path);
+ return;
+ }
+
+ byte[] buf = new byte[RESULT_BUFFER_SIZE];
+ long maxPayload = 0xffffffffL - 5; // 2^32 (max size of payload) - 5
+ // (header size)
+ long remainingTotal = outputFile.length();
+ long remainingInner = 0;
+ int sentTotal = 0;
+ int sentInner = 0;
+ int toSend = 0;
+
+ // the results may be large, so cram as much into a packet as possible
+ while (remainingTotal > maxPayload) {
+ aqljStream.sendUnsignedInt32(4 + 1 + maxPayload);
+ aqljStream.sendChar(AQLJProtocol.DATA_MESSAGE);
+ sentInner = 0;
+ remainingInner = 0;
+ while (sentInner < maxPayload) {
+ remainingInner = maxPayload - sentInner;
+ toSend = fis.read(buf, 0, (remainingInner > buf.length) ? buf.length : (int) remainingInner);
+ sentInner += toSend;
+ aqljStream.send(buf, 0, toSend);
+ }
+ aqljStream.flush();
+ sentTotal += maxPayload;
+ remainingTotal -= sentTotal;
+ }
+
+ // send the remaining data
+ if (remainingTotal > 0) {
+ aqljStream.sendUnsignedInt32(4 + 1 + (int) remainingTotal);
+ aqljStream.sendChar(AQLJProtocol.DATA_MESSAGE);
+ sentInner = 0;
+ remainingInner = 0;
+ while (sentInner < remainingTotal) {
+ remainingInner = remainingTotal - sentInner;
+ toSend = fis.read(buf, 0, (remainingInner > buf.length) ? buf.length : (int) remainingInner);
+ sentInner += toSend;
+ aqljStream.send(buf, 0, toSend);
+ }
+ aqljStream.flush();
+ }
+ outputFile.delete();
+ aqljStream.sendUnsignedInt32(5);
+ aqljStream.sendChar(AQLJProtocol.EXECUTE_COMPLETE_MESSAGE);
+ aqljStream.flush();
+ }
+
+ private void close() {
+ try {
+ aqljStream.close();
+ } catch (IOException e) {
+ LOGGER.severe("Error closing AQLJStream");
+ LOGGER.severe(e.getMessage());
+ }
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/NodeDataClientThreadFactory.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/NodeDataClientThreadFactory.java
new file mode 100644
index 0000000..22efa89
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/NodeDataClientThreadFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.server;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * This class is a factory for client handler threads of type {@link NodeDataClientThread} and is used in conjunction with {@link ThreadedServer}.
+ *
+ * @author zheilbron
+ */
+public class NodeDataClientThreadFactory implements IClientThreadFactory {
+ @Override
+ public Thread createThread(Socket socket) throws IOException {
+ return new NodeDataClientThread(socket);
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/ThreadedServer.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/ThreadedServer.java
new file mode 100644
index 0000000..573d6a3
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/aqlj/server/ThreadedServer.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.aqlj.server;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.logging.Logger;
+
+/**
+ * This server is a multithreaded server that spawns one connection per client
+ * up to MAX_CLIENTS total clients. The type of thread spawned to handle each
+ * client request is delegated to a client thread factory that implements the
+ * IClientThreadFactory interface.
+ * NOTE: The "BE" in logging messages stands for "back-end". This is to
+ * differentiate from the "FE" or "front-end" when reviewing log messages.
+ *
+ * @author zheilbron
+ */
+public class ThreadedServer extends Thread {
+ private static Logger LOGGER = Logger.getLogger(ThreadedServer.class.getName());
+
+ private static final int MAX_CLIENTS = 10;
+
+ private final int port;
+ private final IClientThreadFactory factory;
+
+ private ServerSocket serverSocket;
+ private Socket clientSocket;
+ private Socket[] clientSockets;
+ private Thread[] threads;
+
+ public ThreadedServer(int port, IClientThreadFactory factory) {
+ this.port = port;
+ this.factory = factory;
+ this.clientSockets = new Socket[MAX_CLIENTS];
+ this.threads = new Thread[MAX_CLIENTS];
+ this.clientSocket = null;
+ }
+
+ public void run() {
+ try {
+ serverSocket = new ServerSocket(port);
+ } catch (IOException e) {
+ LOGGER.severe("Error listening on port: " + port);
+ LOGGER.severe(e.getMessage());
+ return;
+ }
+ LOGGER.info("Server started. Listening on port: " + port);
+
+ while (true) {
+ try {
+ clientSocket = serverSocket.accept();
+ } catch (SocketException e) {
+ // This is the normal path the server will take when exiting.
+ //
+ // In order to close the server down properly, the
+ // serverSocket.accept() call must
+ // be interrupted. The only way to interrupt the
+ // serverSocket.accept() call in the loop
+ // above is by calling serverSocket.close() (as is done in the
+ // ThreadedServer.shutdown() method
+ // below). The serverSocket.accept() then throws a
+ // SocketException, so we catch it here
+ // and assume that ThreadedServer.shutdown() was called.
+
+ return;
+ } catch (IOException e) {
+ LOGGER.severe("Failed to accept() connection");
+ LOGGER.severe(e.getMessage());
+ }
+
+ for (int i = 0; i < threads.length; i++) {
+ if (threads[i] == null || !threads[i].isAlive()) {
+ try {
+ threads[i] = factory.createThread(clientSocket);
+ } catch (IOException e) {
+ LOGGER.severe("Failed to create client handler thread");
+ LOGGER.severe(e.getMessage());
+ }
+ clientSockets[i] = clientSocket;
+ threads[i].start();
+ clientSocket = null;
+ break;
+ }
+ }
+
+ // setting the clientSocket to null is an indicator the there was
+ // room for the
+ // connection (i.e. the number of clients < MAX_CLIENTS). If it is
+ // not set, then
+ // there was no room for the connection, so the client is dropped.
+ if (clientSocket != null) {
+ try {
+ clientSocket.close();
+ } catch (IOException e) {
+ LOGGER.severe("Error closing (dropped) client socket.");
+ LOGGER.severe(e.getMessage());
+ }
+ LOGGER.warning("Client was dropped. Maximum number of connections reached!");
+ }
+ }
+ }
+
+ public void shutdown() {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ LOGGER.severe("Error closing server socket.");
+ LOGGER.severe(e.getMessage());
+ }
+
+ try {
+ for (int i = 0; i < threads.length; i++) {
+ if (threads[i] != null && threads[i].isAlive()) {
+ clientSockets[i].close();
+ threads[i].interrupt();
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.severe("Error closing client socket.");
+ LOGGER.severe(e.getMessage());
+ }
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
new file mode 100644
index 0000000..e060146
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -0,0 +1,592 @@
+package edu.uci.ics.asterix.api.common;
+
+import java.io.PrintWriter;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.json.JSONException;
+
+import edu.uci.ics.asterix.api.common.Job.SubmissionMode;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.Query;
+import edu.uci.ics.asterix.aql.expression.visitor.AQLPrintVisitor;
+import edu.uci.ics.asterix.aql.rewrites.AqlRewriter;
+import edu.uci.ics.asterix.aql.translator.DdlTranslator;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.common.AqlExpressionTypeComputer;
+import edu.uci.ics.asterix.dataflow.data.common.AqlMergeAggregationExpressionFactory;
+import edu.uci.ics.asterix.dataflow.data.common.AqlNullableTypeComputer;
+import edu.uci.ics.asterix.dataflow.data.common.AqlPartialAggregationTypeComputer;
+import edu.uci.ics.asterix.file.DatasetOperations;
+import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.asterix.file.IndexOperations;
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.jobgen.AqlLogicalExpressionJobGen;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.optimizer.base.RuleCollections;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionIDFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.translator.AqlExpressionToPlanTranslator;
+import edu.uci.ics.asterix.translator.DmlTranslator;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledBeginFeedStatement;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledControlFeedStatement;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledDeleteStatement;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledInsertStatement;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledLoadFromFileStatement;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledWriteFromQueryResultStatement;
+import edu.uci.ics.asterix.translator.DmlTranslator.ICompiledDmlStatement;
+import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
+import edu.uci.ics.hyracks.algebricks.compiler.api.ICompiler;
+import edu.uci.ics.hyracks.algebricks.compiler.api.ICompilerFactory;
+import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
+import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class APIFramework {
+
+ private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildDefaultLogicalRewrites() {
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> defaultLogicalRewrites = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+ SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
+ SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
+ SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ RuleCollections.buildTypeInferenceRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ RuleCollections.buildNormalizationRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ RuleCollections.buildCondPushDownAndJoinInferenceRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ RuleCollections.buildLoadFieldsRuleCollection()));
+ // fj
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ RuleCollections.buildFuzzyJoinRuleCollection()));
+ //
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ RuleCollections.buildNormalizationRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ RuleCollections.buildCondPushDownAndJoinInferenceRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ RuleCollections.buildLoadFieldsRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ RuleCollections.buildDataExchangeRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ RuleCollections.buildConsolidationRuleCollection()));
+ defaultLogicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ RuleCollections.buildOpPushDownRuleCollection()));
+ return defaultLogicalRewrites;
+ }
+
+ private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildDefaultPhysicalRewrites() {
+ List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> defaultPhysicalRewrites = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+ SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+ SequentialOnceRuleController seqOnceTopLevel = new SequentialOnceRuleController(false);
+ defaultPhysicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ RuleCollections.buildPhysicalRewritesAllLevelsRuleCollection()));
+ defaultPhysicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceTopLevel,
+ RuleCollections.buildPhysicalRewritesTopLevelRuleCollection()));
+ defaultPhysicalRewrites.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ RuleCollections.prepareForJobGenRuleCollection()));
+ return defaultPhysicalRewrites;
+ }
+
+ private static class AqlOptimizationContextFactory implements IOptimizationContextFactory {
+
+ public static final AqlOptimizationContextFactory INSTANCE = new AqlOptimizationContextFactory();
+
+ private AqlOptimizationContextFactory() {
+ }
+
+ @Override
+ public IOptimizationContext createOptimizationContext(int varCounter, int frameSize,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+ IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
+ PhysicalOptimizationConfig physicalOptimizationConfig) {
+ return new AlgebricksOptimizationContext(varCounter, frameSize, expressionEvalSizeComputer,
+ mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
+ physicalOptimizationConfig);
+ }
+
+ }
+
+ public enum DisplayFormat {
+ TEXT,
+ HTML
+ }
+
+ public static String compileDdlStatements(Query query, PrintWriter out, SessionConfig pc, DisplayFormat pdf)
+ throws AsterixException, AlgebricksException, JSONException, RemoteException, ACIDException {
+ // Begin a transaction against the metadata.
+ // Lock the metadata in X mode to protect against other DDL and DML.
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.EXCLUSIVE);
+ try {
+ DdlTranslator ddlt = new DdlTranslator(mdTxnCtx, query.getPrologDeclList(), out, pc, pdf);
+ ddlt.translate(false);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return ddlt.getCompiledDeclarations().getDataverseName();
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ e.printStackTrace();
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public static Job[] compileDmlStatements(String dataverseName, Query query, PrintWriter out, SessionConfig pc,
+ DisplayFormat pdf) throws AsterixException, AlgebricksException, JSONException, RemoteException,
+ ACIDException {
+
+ // Begin a transaction against the metadata.
+ // Lock the metadata in S mode to protect against other DDL
+ // modifications.
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.SHARED);
+ try {
+ DmlTranslator dmlt = new DmlTranslator(mdTxnCtx, query.getPrologDeclList());
+ dmlt.translate();
+
+ if (dmlt.getCompiledDmlStatements().size() == 0) {
+ // There is no DML to run. Consider the transaction against the
+ // metadata successful.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return new Job[] {};
+ }
+
+ List<Job> dmlJobs = new ArrayList<Job>();
+ AqlCompiledMetadataDeclarations metadata = dmlt.getCompiledDeclarations();
+
+ if (!metadata.isConnectedToDataverse())
+ metadata.connectToDataverse(metadata.getDataverseName());
+
+ for (ICompiledDmlStatement stmt : dmlt.getCompiledDmlStatements()) {
+ switch (stmt.getKind()) {
+ case LOAD_FROM_FILE: {
+ CompiledLoadFromFileStatement stmtLoad = (CompiledLoadFromFileStatement) stmt;
+ dmlJobs.addAll(DatasetOperations.createLoadDatasetJobSpec(stmtLoad, metadata));
+ break;
+ }
+ case WRITE_FROM_QUERY_RESULT: {
+ CompiledWriteFromQueryResultStatement stmtLoad = (CompiledWriteFromQueryResultStatement) stmt;
+ SessionConfig sc2 = new SessionConfig(pc.getPort(), true, pc.isPrintExprParam(),
+ pc.isPrintRewrittenExprParam(), pc.isPrintLogicalPlanParam(),
+ pc.isPrintOptimizedLogicalPlanParam(), pc.isPrintPhysicalOpsOnly(), pc.isPrintJob());
+ sc2.setGenerateJobSpec(true);
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> mj = compileQueryInternal(mdTxnCtx,
+ dataverseName, stmtLoad.getQuery(), stmtLoad.getVarCounter(),
+ stmtLoad.getDatasetName(), metadata, sc2, out, pdf,
+ Statement.Kind.WRITE_FROM_QUERY_RESULT);
+ dmlJobs.add(new Job(mj.second));
+ break;
+ }
+ case INSERT: {
+ CompiledInsertStatement stmtLoad = (CompiledInsertStatement) stmt;
+ SessionConfig sc2 = new SessionConfig(pc.getPort(), true, pc.isPrintExprParam(),
+ pc.isPrintRewrittenExprParam(), pc.isPrintLogicalPlanParam(),
+ pc.isPrintOptimizedLogicalPlanParam(), pc.isPrintPhysicalOpsOnly(), pc.isPrintJob());
+ sc2.setGenerateJobSpec(true);
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> mj = compileQueryInternal(mdTxnCtx,
+ dataverseName, stmtLoad.getQuery(), stmtLoad.getVarCounter(),
+ stmtLoad.getDatasetName(), metadata, sc2, out, pdf, Statement.Kind.INSERT);
+ dmlJobs.add(new Job(mj.second));
+ break;
+ }
+ case DELETE: {
+ CompiledDeleteStatement stmtLoad = (CompiledDeleteStatement) stmt;
+ SessionConfig sc2 = new SessionConfig(pc.getPort(), true, pc.isPrintExprParam(),
+ pc.isPrintRewrittenExprParam(), pc.isPrintLogicalPlanParam(),
+ pc.isPrintOptimizedLogicalPlanParam(), pc.isPrintPhysicalOpsOnly(), pc.isPrintJob());
+ sc2.setGenerateJobSpec(true);
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> mj = compileQueryInternal(mdTxnCtx,
+ dataverseName, stmtLoad.getQuery(), stmtLoad.getVarCounter(),
+ stmtLoad.getDatasetName(), metadata, sc2, out, pdf, Statement.Kind.DELETE);
+ dmlJobs.add(new Job(mj.second));
+ break;
+ }
+ case CREATE_INDEX: {
+ CompiledCreateIndexStatement cis = (CompiledCreateIndexStatement) stmt;
+ JobSpecification jobSpec = IndexOperations.buildCreateIndexJobSpec(cis, metadata);
+ dmlJobs.add(new Job(jobSpec));
+ break;
+ }
+
+ case BEGIN_FEED: {
+ CompiledBeginFeedStatement cbfs = (CompiledBeginFeedStatement) stmt;
+ SessionConfig sc2 = new SessionConfig(pc.getPort(), true, pc.isPrintExprParam(),
+ pc.isPrintRewrittenExprParam(), pc.isPrintLogicalPlanParam(),
+ pc.isPrintOptimizedLogicalPlanParam(), pc.isPrintPhysicalOpsOnly(), pc.isPrintJob());
+ sc2.setGenerateJobSpec(true);
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> mj = compileQueryInternal(mdTxnCtx,
+ dataverseName, cbfs.getQuery(), cbfs.getVarCounter(), cbfs.getDatasetName().getValue(),
+ metadata, sc2, out, pdf, Statement.Kind.BEGIN_FEED);
+ dmlJobs.add(new Job(mj.second));
+ break;
+
+ }
+
+ case CONTROL_FEED: {
+ CompiledControlFeedStatement cfs = (CompiledControlFeedStatement) stmt;
+ Job job = new Job(FeedOperations.buildControlFeedJobSpec(cfs, metadata),
+ SubmissionMode.ASYNCHRONOUS);
+ dmlJobs.add(job);
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+ if (pc.isPrintJob()) {
+ int i = 0;
+ for (Job js : dmlJobs) {
+ out.println("<H1>Hyracks job number " + i + ":</H1>");
+ out.println("<PRE>");
+ out.println(js.getJobSpec().toJSON().toString(1));
+ out.println(js.getJobSpec().getUserConstraints());
+ out.println(js.getSubmissionMode());
+ out.println("</PRE>");
+ i++;
+ }
+ }
+ // close connection to dataverse
+ if (metadata.isConnectedToDataverse())
+ metadata.disconnectFromDataverse();
+
+ Job[] jobs = dmlJobs.toArray(new Job[0]);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return jobs;
+ } catch (AsterixException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (AlgebricksException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (JSONException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AsterixException(e);
+ }
+ }
+
+ public static Pair<AqlCompiledMetadataDeclarations, JobSpecification> compileQuery(String dataverseName, Query q,
+ int varCounter, String outputDatasetName, AqlCompiledMetadataDeclarations metadataDecls, SessionConfig pc,
+ PrintWriter out, DisplayFormat pdf, Statement.Kind dmlKind) throws AsterixException, AlgebricksException,
+ JSONException, RemoteException, ACIDException {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.SHARED);
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> result = compileQueryInternal(mdTxnCtx,
+ dataverseName, q, varCounter, outputDatasetName, metadataDecls, pc, out, pdf, dmlKind);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return result;
+ } catch (AsterixException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (AlgebricksException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (JSONException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (RemoteException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (ACIDException e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new AsterixException(e);
+ }
+ }
+
+ public static Pair<AqlCompiledMetadataDeclarations, JobSpecification> compileQueryInternal(
+ MetadataTransactionContext mdTxnCtx, String dataverseName, Query q, int varCounter,
+ String outputDatasetName, AqlCompiledMetadataDeclarations metadataDecls, SessionConfig pc, PrintWriter out,
+ DisplayFormat pdf, Statement.Kind dmlKind) throws AsterixException, AlgebricksException, JSONException,
+ RemoteException, ACIDException {
+
+ if (!pc.isPrintPhysicalOpsOnly() && pc.isPrintExprParam()) {
+ out.println();
+ switch (pdf) {
+ case HTML: {
+ out.println("<H1>Expression tree:</H1>");
+ out.println("<PRE>");
+ break;
+ }
+ case TEXT: {
+ out.println("----------Expression tree:");
+ break;
+ }
+ }
+ if (q != null) {
+ q.accept(new AQLPrintVisitor(out), 0);
+ }
+ switch (pdf) {
+ case HTML: {
+ out.println("</PRE>");
+ break;
+ }
+ }
+ }
+ AqlRewriter rw = new AqlRewriter(q, varCounter, mdTxnCtx, dataverseName);
+ rw.rewrite();
+ Query rwQ = rw.getExpr();
+ if (!pc.isPrintPhysicalOpsOnly() && pc.isPrintRewrittenExprParam()) {
+ out.println();
+
+ switch (pdf) {
+ case HTML: {
+ out.println("<H1>Rewriten expression tree:</H1>");
+ out.println("<PRE>");
+ break;
+ }
+ case TEXT: {
+ out.println("----------Rewritten expression:");
+ break;
+ }
+ }
+
+ if (q != null) {
+ rwQ.accept(new AQLPrintVisitor(out), 0);
+ }
+
+ switch (pdf) {
+ case HTML: {
+ out.println("</PRE>");
+ break;
+ }
+ }
+
+ }
+ long txnId = TransactionIDFactory.generateTransactionId();
+ AqlExpressionToPlanTranslator t = new AqlExpressionToPlanTranslator(txnId, mdTxnCtx, rw.getVarCounter(),
+ outputDatasetName, dmlKind);
+
+ ILogicalPlanAndMetadata planAndMetadata = t.translate(rwQ, metadataDecls);
+ boolean isWriteTransaction = false;
+ AqlMetadataProvider mp = (AqlMetadataProvider) planAndMetadata.getMetadataProvider();
+ if (metadataDecls == null) {
+ metadataDecls = mp.getMetadataDeclarations();
+ }
+ isWriteTransaction = mp.isWriteTransaction();
+
+ if (outputDatasetName == null && metadataDecls.getOutputFile() == null) {
+ throw new AlgebricksException("Unknown output file: `write output to nc:\"file\"' statement missing.");
+ }
+
+ LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+ if (!pc.isPrintPhysicalOpsOnly() && pc.isPrintLogicalPlanParam()) {
+
+ switch (pdf) {
+ case HTML: {
+ out.println("<H1>Logical plan:</H1>");
+ out.println("<PRE>");
+ break;
+ }
+ case TEXT: {
+ out.println("----------Logical plan:");
+ break;
+ }
+ }
+
+ if (q != null) {
+ StringBuilder buffer = new StringBuilder();
+ PlanPrettyPrinter.printPlan(planAndMetadata.getPlan(), buffer, pvisitor, 0);
+ out.print(buffer);
+ }
+
+ switch (pdf) {
+ case HTML: {
+ out.println("</PRE>");
+ break;
+ }
+ }
+ }
+
+ int frameSize = GlobalConfig.DEFAULT_FRAME_SIZE;
+ String frameSizeStr = System.getProperty(GlobalConfig.FRAME_SIZE_PROPERTY);
+ if (frameSizeStr != null) {
+ int fz = -1;
+ try {
+ fz = Integer.parseInt(frameSizeStr);
+ } catch (NumberFormatException nfe) {
+ GlobalConfig.ASTERIX_LOGGER.warning("Wrong frame size size argument. Picking default value ("
+ + GlobalConfig.DEFAULT_FRAME_SIZE + ") instead.\n");
+ throw new AlgebricksException(nfe);
+ }
+ if (fz >= 0) {
+ frameSize = fz;
+ }
+ }
+
+ HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
+ AqlOptimizationContextFactory.INSTANCE);
+ builder.setLogicalRewrites(buildDefaultLogicalRewrites());
+ builder.setPhysicalRewrites(buildDefaultPhysicalRewrites());
+ IDataFormat format = metadataDecls.getFormat();
+ ICompilerFactory compilerFactory = builder.create();
+ builder.setFrameSize(frameSize);
+ builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer());
+ builder.setIMergeAggregationExpressionFactory(new AqlMergeAggregationExpressionFactory());
+ builder.setPartialAggregationTypeComputer(new AqlPartialAggregationTypeComputer());
+ builder.setExpressionTypeComputer(AqlExpressionTypeComputer.INSTANCE);
+ builder.setNullableTypeComputer(AqlNullableTypeComputer.INSTANCE);
+
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
+ builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
+ ICompiler compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),
+ planAndMetadata.getMetadataProvider(), t.getVarCounter());
+ if (pc.isOptimize()) {
+ compiler.optimize();
+ if (pc.isPrintPhysicalOpsOnly()) {
+ StringBuilder buffer = new StringBuilder();
+ PlanPrettyPrinter.printPhysicalOps(planAndMetadata.getPlan(), buffer, 0);
+ out.print(buffer);
+ } else if (pc.isPrintOptimizedLogicalPlanParam()) {
+ switch (pdf) {
+ case HTML: {
+ out.println("<H1>Optimized logical plan:</H1>");
+ out.println("<PRE>");
+ break;
+ }
+ case TEXT: {
+ out.println("----------Optimized plan ");
+ break;
+ }
+ }
+
+ if (q != null) {
+ StringBuilder buffer = new StringBuilder();
+ PlanPrettyPrinter.printPlan(planAndMetadata.getPlan(), buffer, pvisitor, 0);
+ out.print(buffer);
+ }
+ switch (pdf) {
+ case HTML: {
+ out.println("</PRE>");
+ break;
+ }
+ }
+ }
+ }
+
+ if (!pc.isGenerateJobSpec()) {
+ // Job spec not requested. Consider transaction against metadata
+ // committed.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return null;
+ }
+
+ AlgebricksPartitionConstraint clusterLocs = planAndMetadata.getClusterLocations();
+ builder.setBinaryBooleanInspector(format.getBinaryBooleanInspector());
+ builder.setBinaryIntegerInspector(format.getBinaryIntegerInspector());
+ builder.setClusterLocations(clusterLocs);
+ builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider());
+ builder.setExprJobGen(AqlLogicalExpressionJobGen.INSTANCE);
+ builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider());
+ builder.setNullWriterFactory(format.getNullWriterFactory());
+ builder.setPrinterProvider(format.getPrinterFactoryProvider());
+ builder.setSerializerDeserializerProvider(format.getSerdeProvider());
+ builder.setTypeTraitProvider(format.getTypeTraitProvider());
+ builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
+
+ JobSpecification spec = compiler.createJob(AsterixAppContextInfoImpl.INSTANCE);
+ // set the job event listener
+ spec.setJobletEventListenerFactory(new JobEventListenerFactory(txnId, isWriteTransaction));
+ if (pc.isPrintJob()) {
+ switch (pdf) {
+ case HTML: {
+ out.println("<H1>Hyracks job:</H1>");
+ out.println("<PRE>");
+ break;
+ }
+ case TEXT: {
+ out.println("----------Hyracks job:");
+ break;
+ }
+ }
+ if (q != null) {
+ out.println(spec.toJSON().toString(1));
+ out.println(spec.getUserConstraints());
+ }
+ switch (pdf) {
+ case HTML: {
+ out.println("</PRE>");
+ break;
+ }
+ }
+ }
+ return new Pair<AqlCompiledMetadataDeclarations, JobSpecification>(metadataDecls, spec);
+ }
+
+ public static void executeJobArray(JobSpecification[] specs, int port, PrintWriter out, DisplayFormat pdf)
+ throws Exception {
+ IHyracksClientConnection hcc = new HyracksConnection("localhost", port);
+
+ for (int i = 0; i < specs.length; i++) {
+ specs[i].setMaxReattempts(0);
+ JobId jobId = hcc.createJob(GlobalConfig.HYRACKS_APP_NAME, specs[i]);
+ long startTime = System.currentTimeMillis();
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ long endTime = System.currentTimeMillis();
+ double duration = (endTime - startTime) / 1000.00;
+ out.println("<PRE>Duration: " + duration + "</PRE>");
+ }
+
+ }
+
+ public static void executeJobArray(Job[] jobs, int port, PrintWriter out, DisplayFormat pdf) throws Exception {
+ IHyracksClientConnection hcc = new HyracksConnection("localhost", port);
+
+ for (int i = 0; i < jobs.length; i++) {
+ jobs[i].getJobSpec().setMaxReattempts(0);
+ JobId jobId = hcc.createJob(GlobalConfig.HYRACKS_APP_NAME, jobs[i].getJobSpec());
+ long startTime = System.currentTimeMillis();
+ try {
+ hcc.start(jobId);
+ if (jobs[i].getSubmissionMode() == SubmissionMode.ASYNCHRONOUS) {
+ continue;
+ }
+ hcc.waitForCompletion(jobId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ continue;
+ }
+ long endTime = System.currentTimeMillis();
+ double duration = (endTime - startTime) / 1000.00;
+ out.println("<PRE>Duration: " + duration + "</PRE>");
+ }
+
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppContextInfoImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppContextInfoImpl.java
new file mode 100644
index 0000000..5432fb9
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppContextInfoImpl.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.asterix.api.common;
+
+import edu.uci.ics.asterix.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.dataflow.base.IAsterixApplicationContextInfo;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixAppContextInfoImpl implements IAsterixApplicationContextInfo {
+
+ public static final AsterixAppContextInfoImpl INSTANCE = new AsterixAppContextInfoImpl();
+
+ private AsterixAppContextInfoImpl() {
+ }
+
+ @Override
+ public IIndexRegistryProvider<IIndex> getTreeRegisterProvider() {
+ return AsterixTreeRegistryProvider.INSTANCE;
+ }
+
+ @Override
+ public IStorageManagerInterface getStorageManagerInterface() {
+ return AsterixStorageManagerInterface.INSTANCE;
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixClientConfig.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixClientConfig.java
new file mode 100644
index 0000000..70ce277
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixClientConfig.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.asterix.api.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.Option;
+
+public class AsterixClientConfig {
+ @Option(name = "-optimize", usage = "Turns compiler optimizations on (if set to true) or off (if set to false). It is true by default.")
+ public String optimize = "true";
+
+ @Option(name = "-only-physical", usage = "Prints only the physical annotations, not the entire operators. It is false by default.")
+ public String onlyPhysical = "false";
+
+ @Option(name = "-execute", usage = "Executes the job produced by the compiler. It is false by default.")
+ public String execute = "false";
+
+ @Option(name = "-hyracks-job", usage = "Generates and prints the Hyracks job. It is false by default.")
+ public String hyracksJob = "false";
+
+ @Option(name = "-hyracks-port", usage = "The port used to connect to the Hyracks server.")
+ public int hyracksPort = AsterixHyracksIntegrationUtil.DEFAULT_HYRACKS_CC_CLIENT_PORT;
+
+ @Argument
+ private List<String> arguments = new ArrayList<String>();
+
+ public List<String> getArguments() {
+ return arguments;
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
new file mode 100644
index 0000000..4e9f3ef
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.asterix.api.common;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class AsterixHyracksIntegrationUtil {
+
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+
+ public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
+
+ public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
+
+ public static final int FRAME_SIZE = 32768;
+
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static IHyracksClientConnection hcc;
+
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clusterNetIpAddress="127.0.0.1";
+ ccConfig.clientNetIpAddress="127.0.0.1";
+ ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
+ ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ccConfig.defaultMaxJobAttempts = 0;
+ // ccConfig.useJOL = true;
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ncConfig1.clusterNetIPAddress="127.0.0.1";
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ ncConfig1.frameSize = FRAME_SIZE;
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
+
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ncConfig2.clusterNetIPAddress="127.0.0.1";
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ ncConfig2.frameSize = FRAME_SIZE;
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
+
+ hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
+ hcc.createApplication(GlobalConfig.HYRACKS_APP_NAME, null);
+
+ }
+
+ public static void destroyApp() throws Exception {
+ hcc.destroyApplication(GlobalConfig.HYRACKS_APP_NAME);
+ }
+
+ public static void createApp() throws Exception {
+ hcc.createApplication(GlobalConfig.HYRACKS_APP_NAME, null);
+ }
+
+ public static void deinit() throws Exception {
+ nc2.stop();
+ nc1.stop();
+ cc.stop();
+ }
+
+ public static void runJob(JobSpecification spec) throws Exception {
+ JobId jobId = hcc.createJob(GlobalConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
+ hcc.start(jobId);
+ GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
+ hcc.waitForCompletion(jobId);
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/Job.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/Job.java
new file mode 100644
index 0000000..18b08da
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/Job.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.common;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class Job {
+
+ public enum SubmissionMode {
+ SYNCHRONOUS,
+ ASYNCHRONOUS
+ }
+
+ private final JobSpecification jobSpec;
+ private final SubmissionMode submissionMode;
+
+ public Job(JobSpecification jobSpecification, SubmissionMode submissionMode) {
+ this.jobSpec = jobSpecification;
+ this.submissionMode = submissionMode;
+ }
+
+ public Job(JobSpecification jobSpec) {
+ this.jobSpec = jobSpec;
+ this.submissionMode = SubmissionMode.SYNCHRONOUS;
+ }
+
+ public JobSpecification getJobSpec() {
+ return jobSpec;
+ }
+
+ public SubmissionMode getSubmissionMode() {
+ return submissionMode;
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
new file mode 100644
index 0000000..8050a3f
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
@@ -0,0 +1,101 @@
+/**
+ *
+ */
+package edu.uci.ics.asterix.api.common;
+
+public class SessionConfig {
+ private int port;
+ private boolean printExprParam;
+ private boolean printRewrittenExprParam;
+ private boolean printLogicalPlanParam;
+ private boolean printOptimizedLogicalPlanParam;
+ private boolean printPhysicalOpsOnly;
+ private boolean printJob;
+ private boolean optimize;
+ private boolean generateJobSpec = true;
+
+ public SessionConfig(int port, boolean optimize, boolean printExprParam, boolean printRewrittenExprParam,
+ boolean printLogicalPlanParam, boolean printOptimizedLogicalPlanParam, boolean printPhysicalOpsOnly,
+ boolean printJob) {
+ this.setPort(port);
+ this.setOptimize(optimize);
+ this.setPrintExprParam(printExprParam);
+ this.setPrintRewrittenExprParam(printRewrittenExprParam);
+ this.setPrintLogicalPlanParam(printLogicalPlanParam);
+ this.setPrintOptimizedLogicalPlanParam(printOptimizedLogicalPlanParam);
+ this.setPrintPhysicalOpsOnly(printPhysicalOpsOnly);
+ this.setPrintJob(printJob);
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPrintExprParam(boolean printExprParam) {
+ this.printExprParam = printExprParam;
+ }
+
+ public boolean isPrintExprParam() {
+ return printExprParam;
+ }
+
+ public void setPrintRewrittenExprParam(boolean printRewrittenExprParam) {
+ this.printRewrittenExprParam = printRewrittenExprParam;
+ }
+
+ public boolean isPrintRewrittenExprParam() {
+ return printRewrittenExprParam;
+ }
+
+ public void setPrintLogicalPlanParam(boolean printLogicalPlanParam) {
+ this.printLogicalPlanParam = printLogicalPlanParam;
+ }
+
+ public boolean isPrintLogicalPlanParam() {
+ return printLogicalPlanParam;
+ }
+
+ public void setPrintOptimizedLogicalPlanParam(boolean printOptimizedLogicalPlanParam) {
+ this.printOptimizedLogicalPlanParam = printOptimizedLogicalPlanParam;
+ }
+
+ public boolean isPrintOptimizedLogicalPlanParam() {
+ return printOptimizedLogicalPlanParam;
+ }
+
+ public void setPrintJob(boolean printJob) {
+ this.printJob = printJob;
+ }
+
+ public boolean isPrintJob() {
+ return printJob;
+ }
+
+ public void setPrintPhysicalOpsOnly(boolean prinPhysicalOpsOnly) {
+ this.printPhysicalOpsOnly = prinPhysicalOpsOnly;
+ }
+
+ public boolean isPrintPhysicalOpsOnly() {
+ return printPhysicalOpsOnly;
+ }
+
+ public void setOptimize(boolean optimize) {
+ this.optimize = optimize;
+ }
+
+ public boolean isOptimize() {
+ return optimize;
+ }
+
+ public void setGenerateJobSpec(boolean generateJobSpec) {
+ this.generateJobSpec = generateJobSpec;
+ }
+
+ public boolean isGenerateJobSpec() {
+ return generateJobSpec;
+ }
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
new file mode 100644
index 0000000..b74f8a7
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
@@ -0,0 +1,161 @@
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import edu.uci.ics.asterix.api.common.APIFramework;
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
+import edu.uci.ics.asterix.api.common.Job;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.aql.expression.Query;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.parser.ParseException;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class APIServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String query = request.getParameter("query");
+ String printExprParam = request.getParameter("print-expr-tree");
+ String printRewrittenExprParam = request.getParameter("print-rewritten-expr-tree");
+ String printLogicalPlanParam = request.getParameter("print-logical-plan");
+ String printOptimizedLogicalPlanParam = request.getParameter("print-optimized-logical-plan");
+ String printJob = request.getParameter("print-job");
+ String strPort = request.getParameter("hyracks-port");
+ String strDisplayResult = request.getParameter("display-result");
+ int port = Integer.parseInt(strPort);
+ PrintWriter out = response.getWriter();
+ response.setContentType("text/html");
+ out.println("<H1>Input statements:</H1>");
+ printInHtml(out, query);
+ try {
+ AQLParser parser = new AQLParser(new StringReader(query));
+ Query q = (Query) parser.Statement();
+ SessionConfig pc = new SessionConfig(port, true, isSet(printExprParam), isSet(printRewrittenExprParam),
+ isSet(printLogicalPlanParam), isSet(printOptimizedLogicalPlanParam), false, isSet(printJob));
+ pc.setGenerateJobSpec(true);
+
+ MetadataManager.INSTANCE.init();
+ String dataverseName = null;
+
+ if (q != null) {
+ dataverseName = postDmlStatement(q, out, pc);
+ }
+
+ if (q.isDummyQuery()) {
+ return;
+ }
+
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> metadataAndSpec = APIFramework.compileQuery(
+ dataverseName, q, parser.getVarCounter(), null, null, pc, out, DisplayFormat.HTML, null);
+ JobSpecification spec = metadataAndSpec.second;
+ GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString(1));
+ AqlCompiledMetadataDeclarations metadata = metadataAndSpec.first;
+ long startTime = System.currentTimeMillis();
+ APIFramework.executeJobArray(new JobSpecification[] { spec }, port, out, DisplayFormat.HTML);
+ long endTime = System.currentTimeMillis();
+ double duration = (endTime - startTime) / 1000.00;
+ out.println("<H1>Result:</H1>");
+
+ out.println("<PRE>");
+ out.println(metadata.getOutputFile().getNodeName() + ":"
+ + metadata.getOutputFile().getLocalFile().getFile().getPath());
+ out.println("Duration: " + duration);
+ out.println("</PRE>");
+
+ if (isSet(strDisplayResult)) {
+ out.println("<PRE>");
+ displayFile(metadata.getOutputFile().getLocalFile().getFile(), out);
+ out.println("</PRE>");
+ }
+ } catch (ParseException pe) {
+ String message = pe.getMessage();
+ message = message.replace("<", "<");
+ message = message.replace(">", ">");
+ int pos = message.indexOf("line");
+ int columnPos = message.indexOf(",", pos + 1 + "line".length());
+ int lineNo = Integer.parseInt(message.substring(pos + "line".length() + 1, columnPos));
+ String line = query.split("\n")[lineNo - 1];
+ out.println("SyntaxError:" + message);
+ out.println("==> " + line);
+
+ } catch (Exception e) {
+ out.println(e.getMessage());
+ }
+ }
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ PrintWriter out = response.getWriter();
+ response.setContentType("text/html");
+ final String form = "<form method=\"post\">"
+ + "<center><textarea cols=\"80\" rows=\"25\" name=\"query\" ></textarea><br/>"
+ + "Port: <input type = \"text\" name = \"hyracks-port\" size=\"5\" maxlength=\"5\" value=\"1098\" /><br/>"
+ + "<input type = \"checkbox\" name = \"print-expr-tree\" value=\"true\" />print parsed expressions<P>"
+ + "<input type = \"checkbox\" name = \"print-rewritten-expr-tree\" value=\"true\" />print rewritten expressions<P>"
+ + "<input type = \"checkbox\" name = \"print-logical-plan\" value=\"true\" checked/>print logical plan<P>"
+ + "<input type = \"checkbox\" name = \"print-optimized-logical-plan\" value=\"true\" checked/>print optimized logical plan<P>"
+ + "<input type = \"checkbox\" name = \"print-job\" value=\"true\" checked/>print Hyracks job<P>"
+ + "<input type = \"checkbox\" name = \"display-result\" value=\"true\" checked/>display NFS file<P>"
+ // +
+ // "<input type = \"checkbox\" name = \"serialize-as-xml\" value=\"true\">serialize as XML<P>"
+ // +
+ // "<input type = \"checkbox\" name = \"show-tuples\" value=\"true\">show the entire tuples<P>"
+ + "<input type=\"submit\"/>" + "</center>" + "</form>";
+ out.println(form);
+ }
+
+ private String postDmlStatement(Query dummyQ, PrintWriter out, SessionConfig pc) throws Exception {
+
+ String dataverseName = APIFramework.compileDdlStatements(dummyQ, out, pc, DisplayFormat.TEXT);
+ Job[] dmlJobSpecs = APIFramework.compileDmlStatements(dataverseName, dummyQ, out, pc, DisplayFormat.HTML);
+
+ long startTime = System.currentTimeMillis();
+ APIFramework.executeJobArray(dmlJobSpecs, pc.getPort(), out, DisplayFormat.HTML);
+ long endTime = System.currentTimeMillis();
+ double duration = (endTime - startTime) / 1000.00;
+ out.println("<PRE>Duration of all jobs: " + duration + "</PRE>");
+ return dataverseName;
+ }
+
+ private static boolean isSet(String requestParameter) {
+ return (requestParameter != null && requestParameter.equals("true"));
+ }
+
+ private static void printInHtml(PrintWriter out, String s) {
+ out.println("<PRE>");
+ out.println(s);
+ out.println("</PRE>");
+ }
+
+ private void displayFile(File localFile, PrintWriter out) throws IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(localFile));
+ String inputLine = reader.readLine();
+ int i = 0;
+ while (inputLine != null) {
+ out.println(inputLine);
+ inputLine = reader.readLine();
+ i++;
+ if (i > 500) {
+ out.println("...");
+ out.println("SKIPPING THE REST OF THE RESULTS");
+ break;
+ }
+ }
+ reader.close();
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
new file mode 100644
index 0000000..0f5059a
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
@@ -0,0 +1,96 @@
+package edu.uci.ics.asterix.api.java;
+
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.rmi.RemoteException;
+
+import org.json.JSONException;
+
+import edu.uci.ics.asterix.api.common.APIFramework;
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
+import edu.uci.ics.asterix.api.common.AsterixHyracksIntegrationUtil;
+import edu.uci.ics.asterix.api.common.Job;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.aql.expression.Query;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.parser.ParseException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class AsterixJavaClient {
+
+ private Reader queryText;
+ private PrintWriter writer;
+
+ private Job[] dmlJobs;
+ private JobSpecification queryJobSpec;
+
+ public AsterixJavaClient(Reader queryText, PrintWriter writer) {
+ this.queryText = queryText;
+ this.writer = writer;
+ }
+
+ public AsterixJavaClient(Reader queryText) {
+ this.queryText = queryText;
+ this.writer = new PrintWriter(System.out, true);
+ }
+
+ public void compile() throws Exception {
+ compile(true, false, false, false, false, false, false);
+ }
+
+ public void compile(boolean optimize, boolean printRewrittenExpressions, boolean printLogicalPlan,
+ boolean printOptimizedPlan, boolean printPhysicalOpsOnly, boolean generateBinaryRuntime, boolean printJob)
+ throws AsterixException, AlgebricksException, JSONException, RemoteException, ACIDException {
+ queryJobSpec = null;
+ dmlJobs = null;
+
+ if (queryText == null) {
+ return;
+ }
+ AQLParser parser = new AQLParser(queryText);
+ Query q;
+ try {
+ q = (Query) parser.Statement();
+ } catch (ParseException pe) {
+ throw new AsterixException(pe);
+ }
+ MetadataManager.INSTANCE.init();
+
+ SessionConfig pc = new SessionConfig(AsterixHyracksIntegrationUtil.DEFAULT_HYRACKS_CC_CLIENT_PORT, optimize, false,
+ printRewrittenExpressions, printLogicalPlan, printOptimizedPlan, printPhysicalOpsOnly, printJob);
+ pc.setGenerateJobSpec(generateBinaryRuntime);
+
+ String dataverseName = null;
+ if (q != null) {
+ dataverseName = APIFramework.compileDdlStatements(q, writer, pc, DisplayFormat.TEXT);
+ dmlJobs = APIFramework.compileDmlStatements(dataverseName, q, writer, pc, DisplayFormat.TEXT);
+ }
+
+ if (q.isDummyQuery()) {
+ return;
+ }
+
+ Pair<AqlCompiledMetadataDeclarations, JobSpecification> metadataAndSpec = APIFramework.compileQuery(dataverseName, q,
+ parser.getVarCounter(), null, null, pc, writer, DisplayFormat.TEXT, null);
+ if (metadataAndSpec != null) {
+ queryJobSpec = metadataAndSpec.second;
+ }
+ writer.flush();
+ }
+
+ public void execute(int port) throws Exception {
+ if (dmlJobs != null) {
+ APIFramework.executeJobArray(dmlJobs, port, writer, DisplayFormat.TEXT);
+ }
+ if (queryJobSpec != null) {
+ APIFramework.executeJobArray(new JobSpecification[] { queryJobSpec }, port, writer, DisplayFormat.TEXT);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/DdlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/DdlTranslator.java
new file mode 100644
index 0000000..36fd75f
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/DdlTranslator.java
@@ -0,0 +1,1011 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.translator;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.base.Statement.Kind;
+import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.DatasetDecl;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
+import edu.uci.ics.asterix.aql.expression.DropStatement;
+import edu.uci.ics.asterix.aql.expression.ExternalDetailsDecl;
+import edu.uci.ics.asterix.aql.expression.FeedDetailsDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
+import edu.uci.ics.asterix.aql.expression.InternalDetailsDecl;
+import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
+import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
+import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.Query;
+import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition.RecordKind;
+import edu.uci.ics.asterix.aql.expression.TypeDecl;
+import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
+import edu.uci.ics.asterix.aql.expression.TypeExpression;
+import edu.uci.ics.asterix.aql.expression.TypeReferenceExpression;
+import edu.uci.ics.asterix.aql.expression.UnorderedListTypeDefinition;
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.aql.util.FunctionUtil;
+import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
+import edu.uci.ics.asterix.file.DatasetOperations;
+import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.file.IndexOperations;
+import edu.uci.ics.asterix.metadata.IDatasetDetails;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinArtifactMap;
+import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinArtifactMap.ARTIFACT_KIND;
+import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.Datatype;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
+import edu.uci.ics.asterix.om.types.AOrderedListType;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.AbstractCollectionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DdlTranslator extends AbstractAqlTranslator {
+
+ private final MetadataTransactionContext mdTxnCtx;
+ private final List<Statement> aqlStatements;
+ private final PrintWriter out;
+ private final SessionConfig pc;
+ private final DisplayFormat pdf;
+ private AqlCompiledMetadataDeclarations compiledDeclarations;
+
+ private static Map<String, BuiltinType> builtinTypeMap;
+
+ public DdlTranslator(MetadataTransactionContext mdTxnCtx, List<Statement> aqlStatements, PrintWriter out,
+ SessionConfig pc, DisplayFormat pdf) {
+ this.mdTxnCtx = mdTxnCtx;
+ this.aqlStatements = aqlStatements;
+ this.out = out;
+ this.pc = pc;
+ this.pdf = pdf;
+ builtinTypeMap = AsterixBuiltinTypeMap.getBuiltinTypes();
+ }
+
+ public void translate(boolean disconnectFromDataverse) throws AlgebricksException {
+ try {
+ compiledDeclarations = compileMetadata(mdTxnCtx, aqlStatements, true);
+ compileAndExecuteDDLstatements(mdTxnCtx, disconnectFromDataverse);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ private void compileAndExecuteDDLstatements(MetadataTransactionContext mdTxnCtx, boolean disconnectFromDataverse)
+ throws Exception {
+ for (Statement stmt : aqlStatements) {
+ validateOperation(compiledDeclarations, stmt);
+ switch (stmt.getKind()) {
+ // connect statement
+ case DATAVERSE_DECL: {
+ checkForDataverseConnection(false);
+ DataverseDecl dvd = (DataverseDecl) stmt;
+ String dataverseName = dvd.getDataverseName().getValue();
+ compiledDeclarations.connectToDataverse(dataverseName);
+ break;
+ }
+ // create statements
+ case CREATE_DATAVERSE: {
+ checkForDataverseConnection(false);
+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+ if (dv != null) {
+ if (!stmtCreateDataverse.getIfNotExists())
+ throw new AlgebricksException("\nA dataverse with this name " + dvName + " already exists.");
+ } else {
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, stmtCreateDataverse
+ .getFormat()));
+ }
+ break;
+ }
+ case DATASET_DECL: {
+ checkForDataverseConnection(true);
+ DatasetDecl dd = (DatasetDecl) stmt;
+ String datasetName = dd.getName().getValue();
+ DatasetType dsType = dd.getDatasetType();
+ String itemTypeName = null;
+ IDatasetDetails datasetDetails = null;
+
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ datasetName);
+ if (ds != null) {
+ if (!dd.getIfNotExists())
+ throw new AlgebricksException("\nA dataset with this name " + datasetName
+ + " already exists.");
+ } else {
+ itemTypeName = dd.getItemTypeName().getValue();
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, compiledDeclarations
+ .getDataverseName(), itemTypeName);
+ if (dt == null)
+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
+
+ switch (dd.getDatasetType()) {
+ case INTERNAL: {
+ IAType itemType = dt.getDatatype();
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only partition ARecord's.");
+ }
+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+ .getPartitioningExprs();
+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName()
+ .getValue();
+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs,
+ partitioningExprs, ngName);
+ }
+ break;
+ case EXTERNAL: {
+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl())
+ .getProperties();
+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
+ }
+ break;
+ case FEED: {
+ IAType itemType = dt.getDatatype();
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only partition ARecord's.");
+ }
+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+ .getPartitioningExprs();
+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName()
+ .getValue();
+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterClassname();
+ Map<String, String> properties = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+ .getProperties();
+ String functionIdentifier = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
+ .getFunctionIdentifier();
+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs,
+ partitioningExprs, ngName, adapter, properties, functionIdentifier,
+ FeedDatasetDetails.FeedState.INACTIVE.toString());
+
+ }
+ break;
+ }
+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(compiledDeclarations
+ .getDataverseName(), datasetName, itemTypeName, datasetDetails, dsType));
+
+ // If the dataset is of type INTERNAL or FEED, Asterix
+ // needs to create Tree indexes at all nodes
+ // corresponding to the associated node group. This is
+ // not required for external datasets as
+ // the data for such a dataset is never persisted in
+ // Asterix storage.
+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
+ compileDatasetInitializeStatement(mdTxnCtx.getTxnId(), datasetName);
+ }
+ }
+ break;
+ }
+
+ case CREATE_INDEX: {
+ checkForDataverseConnection(true);
+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ datasetName);
+ if (ds == null)
+ throw new AlgebricksException("\nThere is no dataset with this name " + datasetName);
+ String indexName = stmtCreateIndex.getIndexName().getValue();
+ Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ datasetName, indexName);
+ if (idx != null) {
+ if (!stmtCreateIndex.getIfNotExists())
+ throw new AlgebricksException("\nAn index with this name " + indexName + " already exists.");
+ else
+ stmtCreateIndex.setNeedToCreate(false);
+ } else {
+ MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(compiledDeclarations.getDataverseName(),
+ datasetName, indexName, stmtCreateIndex.getIndexType(),
+ stmtCreateIndex.getFieldExprs(), false));
+ }
+ break;
+ }
+ case TYPE_DECL: {
+ checkForDataverseConnection(true);
+ TypeDecl stmtCreateType = (TypeDecl) stmt;
+ String typeName = stmtCreateType.getIdent().getValue();
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, compiledDeclarations
+ .getDataverseName(), typeName);
+ if (dt != null) {
+ if (!stmtCreateType.getIfNotExists())
+ throw new AlgebricksException("\nA datatype with this name " + typeName
+ + " already exists.");
+ } else {
+ if (builtinTypeMap.get(typeName) != null) {
+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
+ } else {
+ Map<String, IAType> typeMap = computeTypes(mdTxnCtx, (TypeDecl) stmt);
+ IAType type = typeMap.get(typeName);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(compiledDeclarations
+ .getDataverseName(), typeName, type, false));
+ }
+ }
+ break;
+ }
+ case NODEGROUP_DECL: {
+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
+ if (ng != null) {
+ if (!stmtCreateNodegroup.getIfNotExists())
+ throw new AlgebricksException("\nA nodegroup with this name " + ngName + " already exists.");
+ } else {
+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
+ for (Identifier id : ncIdentifiers) {
+ ncNames.add(id.getValue());
+ }
+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+ }
+ break;
+ }
+ // drop statements
+ case DATAVERSE_DROP: {
+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+ String dvName = stmtDelete.getDataverseName().getValue();
+ if (AsterixBuiltinArtifactMap.isSystemProtectedArtifact(ARTIFACT_KIND.DATAVERSE, dvName)) {
+ throw new AsterixException(" Invalid Operation cannot drop dataverse " + dvName
+ + " (protected by system)");
+ }
+
+ if (compiledDeclarations.isConnectedToDataverse())
+ compiledDeclarations.disconnectFromDataverse();
+ checkForDataverseConnection(false);
+
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
+ if (dv == null) {
+ if (!stmtDelete.getIfExists())
+ throw new AlgebricksException("\nThere is no dataverse with this name " + dvName + ".");
+ } else {
+ compiledDeclarations.connectToDataverse(dvName);
+ List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
+ for (int j = 0; j < datasets.size(); j++) {
+ String datasetName = datasets.get(j).getDatasetName();
+ DatasetType dsType = datasets.get(j).getType();
+ if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName,
+ datasetName);
+ for (int k = 0; k < indexes.size(); k++) {
+ if (indexes.get(k).isSecondaryIndex()) {
+ compileIndexDropStatement(mdTxnCtx, datasetName, indexes.get(k).getIndexName());
+ }
+ }
+ }
+ compileDatasetDropStatement(mdTxnCtx, datasetName);
+ }
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
+ if (compiledDeclarations.isConnectedToDataverse())
+ compiledDeclarations.disconnectFromDataverse();
+ }
+ break;
+ }
+ case DATASET_DROP: {
+ checkForDataverseConnection(true);
+ DropStatement stmtDelete = (DropStatement) stmt;
+ String datasetName = stmtDelete.getDatasetName().getValue();
+ if (AsterixBuiltinArtifactMap.isSystemProtectedArtifact(ARTIFACT_KIND.DATASET, datasetName)) {
+ throw new AsterixException(" Invalid Operation cannot drop dataset " + datasetName
+ + " (protected by system)");
+ }
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ datasetName);
+ if (ds == null) {
+ if (!stmtDelete.getIfExists())
+ throw new AlgebricksException("\nThere is no dataset with this name " + datasetName + ".");
+ } else {
+ if (ds.getType() == DatasetType.INTERNAL || ds.getType() == DatasetType.FEED) {
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+ compiledDeclarations.getDataverseName(), datasetName);
+ for (int j = 0; j < indexes.size(); j++) {
+ if (indexes.get(j).isPrimaryIndex()) {
+ compileIndexDropStatement(mdTxnCtx, datasetName, indexes.get(j).getIndexName());
+ }
+ }
+ }
+ compileDatasetDropStatement(mdTxnCtx, datasetName);
+ }
+ break;
+ }
+ case INDEX_DROP: {
+ checkForDataverseConnection(true);
+ IndexDropStatement stmtDelete = (IndexDropStatement) stmt;
+ String datasetName = stmtDelete.getDatasetName().getValue();
+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ datasetName);
+ if (ds == null)
+ throw new AlgebricksException("\nThere is no dataset with this name " + datasetName + ".");
+ if (ds.getType() == DatasetType.INTERNAL || ds.getType() == DatasetType.FEED) {
+ String indexName = stmtDelete.getIndexName().getValue();
+ Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx,
+ compiledDeclarations.getDataverseName(), datasetName, indexName);
+ if (idx == null) {
+ if (!stmtDelete.getIfExists())
+ throw new AlgebricksException("\nThere is no index with this name " + indexName + ".");
+ } else
+ compileIndexDropStatement(mdTxnCtx, datasetName, indexName);
+ } else {
+ throw new AlgebricksException(datasetName
+ + " is an external dataset. Indexes are not maintained for external datasets.");
+ }
+ break;
+ }
+ case TYPE_DROP: {
+ checkForDataverseConnection(true);
+ TypeDropStatement stmtDelete = (TypeDropStatement) stmt;
+ String typeName = stmtDelete.getTypeName().getValue();
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, compiledDeclarations
+ .getDataverseName(), typeName);
+ if (dt == null) {
+ if (!stmtDelete.getIfExists())
+ throw new AlgebricksException("\nThere is no datatype with this name " + typeName + ".");
+ } else
+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ typeName);
+ break;
+ }
+ case NODEGROUP_DROP: {
+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
+ if (AsterixBuiltinArtifactMap.isSystemProtectedArtifact(ARTIFACT_KIND.NODEGROUP, nodegroupName)) {
+ throw new AsterixException(" Invalid Operation cannot drop nodegroup " + nodegroupName
+ + " (protected by system)");
+ }
+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
+ if (ng == null) {
+ if (!stmtDelete.getIfExists())
+ throw new AlgebricksException("\nThere is no nodegroup with this name " + nodegroupName
+ + ".");
+ } else
+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
+ break;
+ }
+
+ case CREATE_FUNCTION: {
+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+ Function function = new Function(compiledDeclarations.getDataverseName(), cfs
+ .getFunctionIdentifier().getValue(), cfs.getFunctionIdentifier().getArity(), cfs
+ .getParamList(), cfs.getFunctionBody());
+ try {
+ FunctionUtil.getFunctionDecl(function);
+ } catch (Exception e) {
+ throw new AsterixException("unable to compile function definition", e);
+ }
+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, new Function(
+ compiledDeclarations.getDataverseName(), cfs.getFunctionIdentifier().getValue(), cfs
+ .getFunctionIdentifier().getArity(), cfs.getParamList(), cfs.getFunctionBody()));
+ break;
+ }
+
+ case FUNCTION_DROP: {
+ checkForDataverseConnection(true);
+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+ String functionName = stmtDropFunction.getFunctionName().getValue();
+ FunctionIdentifier fId = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, functionName,
+ stmtDropFunction.getArity(), false);
+ if (AsterixBuiltinArtifactMap.isSystemProtectedArtifact(ARTIFACT_KIND.FUNCTION, fId)) {
+ throw new AsterixException(" Invalid Operation cannot drop function " + functionName
+ + " (protected by system)");
+ }
+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, compiledDeclarations
+ .getDataverseName(), functionName, stmtDropFunction.getArity());
+ if (function == null) {
+ if (!stmtDropFunction.getIfExists())
+ throw new AlgebricksException("\nThere is no function with this name " + functionName + ".");
+ } else {
+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ functionName, stmtDropFunction.getArity());
+ }
+ break;
+ }
+ }
+ }
+
+ if (disconnectFromDataverse) {
+ // disconnect the dataverse
+ if (compiledDeclarations.isConnectedToDataverse())
+ compiledDeclarations.disconnectFromDataverse();
+ }
+ }
+
+ private void checkForDataverseConnection(boolean needConnection) throws AlgebricksException {
+ if (compiledDeclarations.isConnectedToDataverse() != needConnection) {
+ if (needConnection)
+ throw new AlgebricksException("You need first to connect to a dataverse.");
+ else
+ throw new AlgebricksException("You need first to disconnect from the dataverse.");
+ }
+ }
+
+ private void runJob(JobSpecification jobSpec) throws Exception {
+ System.out.println(jobSpec.toString());
+ executeJobArray(new JobSpecification[] { jobSpec }, pc.getPort(), out, pdf);
+ }
+
+ public void executeJobArray(JobSpecification[] specs, int port, PrintWriter out, DisplayFormat pdf)
+ throws Exception {
+ IHyracksClientConnection hcc = new HyracksConnection("localhost", port);
+
+ for (int i = 0; i < specs.length; i++) {
+ specs[i].setMaxReattempts(0);
+ JobId jobId = hcc.createJob(GlobalConfig.HYRACKS_APP_NAME, specs[i]);
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ }
+ }
+
+ private void compileDatasetDropStatement(MetadataTransactionContext mdTxnCtx, String datasetName) throws Exception {
+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(datasetName);
+ Dataset ds = MetadataManager.INSTANCE
+ .getDataset(mdTxnCtx, compiledDeclarations.getDataverseName(), datasetName);
+ if (ds.getType() == DatasetType.INTERNAL || ds.getType() == DatasetType.FEED) {
+ JobSpecification[] jobs = DatasetOperations.createDropDatasetJobSpec(cds, compiledDeclarations);
+ for (JobSpecification job : jobs)
+ runJob(job);
+ }
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, compiledDeclarations.getDataverseName(), datasetName);
+ }
+
+ private void compileDatasetInitializeStatement(long txnId, String datasetName) throws Exception {
+ JobSpecification[] jobs = DatasetOperations.createInitializeDatasetJobSpec(txnId, datasetName,
+ compiledDeclarations);
+ for (JobSpecification job : jobs) {
+ runJob(job);
+ }
+ }
+
+ public AqlCompiledMetadataDeclarations getCompiledDeclarations() {
+ return compiledDeclarations;
+ }
+
+ private void compileIndexDropStatement(MetadataTransactionContext mdTxnCtx, String datasetName, String indexName)
+ throws Exception {
+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(datasetName, indexName);
+ runJob(IndexOperations.createSecondaryIndexDropJobSpec(cds, compiledDeclarations));
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, compiledDeclarations.getDataverseName(), datasetName, indexName);
+ }
+
+ private Map<String, IAType> computeTypes(MetadataTransactionContext mdTxnCtx, TypeDecl tDec)
+ throws AlgebricksException, MetadataException {
+ Map<String, IAType> typeMap = new HashMap<String, IAType>();
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes = new HashMap<String, Map<ARecordType, List<Integer>>>();
+ Map<String, List<AbstractCollectionType>> incompleteItemTypes = new HashMap<String, List<AbstractCollectionType>>();
+ Map<String, List<String>> incompleteTopLevelTypeReferences = new HashMap<String, List<String>>();
+
+ firstPass(tDec, typeMap, incompleteFieldTypes, incompleteItemTypes, incompleteTopLevelTypeReferences);
+ secondPass(mdTxnCtx, typeMap, incompleteFieldTypes, incompleteItemTypes, incompleteTopLevelTypeReferences);
+
+ return typeMap;
+ }
+
+ private void secondPass(MetadataTransactionContext mdTxnCtx, Map<String, IAType> typeMap,
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes,
+ Map<String, List<AbstractCollectionType>> incompleteItemTypes,
+ Map<String, List<String>> incompleteTopLevelTypeReferences) throws AlgebricksException, MetadataException {
+ // solve remaining top level references
+ for (String trefName : incompleteTopLevelTypeReferences.keySet()) {
+ IAType t;// = typeMap.get(trefName);
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ trefName);
+ if (dt == null) {
+ throw new AlgebricksException("Could not resolve type " + trefName);
+ } else
+ t = dt.getDatatype();
+ for (String tname : incompleteTopLevelTypeReferences.get(trefName)) {
+ typeMap.put(tname, t);
+ }
+ }
+ // solve remaining field type references
+ for (String trefName : incompleteFieldTypes.keySet()) {
+ IAType t;// = typeMap.get(trefName);
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ trefName);
+ if (dt == null) {
+ throw new AlgebricksException("Could not resolve type " + trefName);
+ } else
+ t = dt.getDatatype();
+ Map<ARecordType, List<Integer>> fieldsToFix = incompleteFieldTypes.get(trefName);
+ for (ARecordType recType : fieldsToFix.keySet()) {
+ List<Integer> positions = fieldsToFix.get(recType);
+ IAType[] fldTypes = recType.getFieldTypes();
+ for (Integer pos : positions) {
+ if (fldTypes[pos] == null) {
+ fldTypes[pos] = t;
+ } else { // nullable
+ AUnionType nullableUnion = (AUnionType) fldTypes[pos];
+ nullableUnion.setTypeAtIndex(t, 1);
+ }
+ }
+ }
+ }
+ // solve remaining item type references
+ for (String trefName : incompleteItemTypes.keySet()) {
+ IAType t;// = typeMap.get(trefName);
+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, compiledDeclarations.getDataverseName(),
+ trefName);
+ if (dt == null) {
+ throw new AlgebricksException("Could not resolve type " + trefName);
+ } else
+ t = dt.getDatatype();
+ for (AbstractCollectionType act : incompleteItemTypes.get(trefName)) {
+ act.setItemType(t);
+ }
+ }
+ }
+
+ private void firstPass(TypeDecl td, Map<String, IAType> typeMap,
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes,
+ Map<String, List<AbstractCollectionType>> incompleteItemTypes,
+ Map<String, List<String>> incompleteTopLevelTypeReferences) throws AlgebricksException {
+
+ TypeExpression texpr = td.getTypeDef();
+ String tdname = td.getIdent().getValue();
+ if (builtinTypeMap.get(tdname) != null) {
+ throw new AlgebricksException("Cannot redefine builtin type " + tdname + " .");
+ }
+ switch (texpr.getTypeKind()) {
+ case TYPEREFERENCE: {
+ TypeReferenceExpression tre = (TypeReferenceExpression) texpr;
+ IAType t = solveTypeReference(tre, typeMap);
+ if (t != null) {
+ typeMap.put(tdname, t);
+ } else {
+ addIncompleteTopLevelTypeReference(tdname, tre, incompleteTopLevelTypeReferences);
+ }
+ break;
+ }
+ case RECORD: {
+ RecordTypeDefinition rtd = (RecordTypeDefinition) texpr;
+ ARecordType recType = computeRecordType(tdname, rtd, typeMap, incompleteFieldTypes, incompleteItemTypes);
+ typeMap.put(tdname, recType);
+ break;
+ }
+ case ORDEREDLIST: {
+ OrderedListTypeDefinition oltd = (OrderedListTypeDefinition) texpr;
+ AOrderedListType olType = computeOrderedListType(tdname, oltd, typeMap, incompleteItemTypes,
+ incompleteFieldTypes);
+ typeMap.put(tdname, olType);
+ break;
+ }
+ case UNORDEREDLIST: {
+ UnorderedListTypeDefinition ultd = (UnorderedListTypeDefinition) texpr;
+ AUnorderedListType ulType = computeUnorderedListType(tdname, ultd, typeMap, incompleteItemTypes,
+ incompleteFieldTypes);
+ typeMap.put(tdname, ulType);
+ break;
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private AOrderedListType computeOrderedListType(String typeName, OrderedListTypeDefinition oltd,
+ Map<String, IAType> typeMap, Map<String, List<AbstractCollectionType>> incompleteItemTypes,
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes) {
+ TypeExpression tExpr = oltd.getItemTypeExpression();
+ AOrderedListType aolt = new AOrderedListType(null, typeName);
+ setCollectionItemType(tExpr, typeMap, incompleteItemTypes, incompleteFieldTypes, aolt);
+ return aolt;
+ }
+
+ private AUnorderedListType computeUnorderedListType(String typeName, UnorderedListTypeDefinition ultd,
+ Map<String, IAType> typeMap, Map<String, List<AbstractCollectionType>> incompleteItemTypes,
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes) {
+ TypeExpression tExpr = ultd.getItemTypeExpression();
+ AUnorderedListType ault = new AUnorderedListType(null, typeName);
+ setCollectionItemType(tExpr, typeMap, incompleteItemTypes, incompleteFieldTypes, ault);
+ return ault;
+ }
+
+ private void setCollectionItemType(TypeExpression tExpr, Map<String, IAType> typeMap,
+ Map<String, List<AbstractCollectionType>> incompleteItemTypes,
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes, AbstractCollectionType act) {
+ switch (tExpr.getTypeKind()) {
+ case ORDEREDLIST: {
+ OrderedListTypeDefinition oltd = (OrderedListTypeDefinition) tExpr;
+ IAType t = computeOrderedListType(null, oltd, typeMap, incompleteItemTypes, incompleteFieldTypes);
+ act.setItemType(t);
+ break;
+ }
+ case UNORDEREDLIST: {
+ UnorderedListTypeDefinition ultd = (UnorderedListTypeDefinition) tExpr;
+ IAType t = computeUnorderedListType(null, ultd, typeMap, incompleteItemTypes, incompleteFieldTypes);
+ act.setItemType(t);
+ break;
+ }
+ case RECORD: {
+ RecordTypeDefinition rtd = (RecordTypeDefinition) tExpr;
+ IAType t = computeRecordType(null, rtd, typeMap, incompleteFieldTypes, incompleteItemTypes);
+ act.setItemType(t);
+ break;
+ }
+ case TYPEREFERENCE: {
+ TypeReferenceExpression tre = (TypeReferenceExpression) tExpr;
+ IAType tref = solveTypeReference(tre, typeMap);
+ if (tref != null) {
+ act.setItemType(tref);
+ } else {
+ addIncompleteCollectionTypeReference(act, tre, incompleteItemTypes);
+ }
+ break;
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private ARecordType computeRecordType(String typeName, RecordTypeDefinition rtd, Map<String, IAType> typeMap,
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes,
+ Map<String, List<AbstractCollectionType>> incompleteItemTypes) {
+ List<String> names = rtd.getFieldNames();
+ int n = names.size();
+ String[] fldNames = new String[n];
+ IAType[] fldTypes = new IAType[n];
+ int i = 0;
+ for (String s : names) {
+ fldNames[i++] = s;
+ }
+ boolean isOpen = rtd.getRecordKind() == RecordKind.OPEN;
+ ARecordType recType = new ARecordType(typeName, fldNames, fldTypes, isOpen);
+ for (int j = 0; j < n; j++) {
+ TypeExpression texpr = rtd.getFieldTypes().get(j);
+ switch (texpr.getTypeKind()) {
+ case TYPEREFERENCE: {
+ TypeReferenceExpression tre = (TypeReferenceExpression) texpr;
+ IAType tref = solveTypeReference(tre, typeMap);
+ if (tref != null) {
+ if (!rtd.getNullableFields().get(j)) { // not nullable
+ fldTypes[j] = tref;
+ } else { // nullable
+ fldTypes[j] = makeUnionWithNull(null, tref);
+ }
+ } else {
+ addIncompleteFieldTypeReference(recType, j, tre, incompleteFieldTypes);
+ if (rtd.getNullableFields().get(j)) {
+ fldTypes[j] = makeUnionWithNull(null, null);
+ }
+ }
+ break;
+ }
+ case RECORD: {
+ RecordTypeDefinition recTypeDef2 = (RecordTypeDefinition) texpr;
+ IAType t2 = computeRecordType(null, recTypeDef2, typeMap, incompleteFieldTypes, incompleteItemTypes);
+ if (!rtd.getNullableFields().get(j)) { // not nullable
+ fldTypes[j] = t2;
+ } else { // nullable
+ fldTypes[j] = makeUnionWithNull(null, t2);
+ }
+ break;
+ }
+ case ORDEREDLIST: {
+ OrderedListTypeDefinition oltd = (OrderedListTypeDefinition) texpr;
+ IAType t2 = computeOrderedListType(null, oltd, typeMap, incompleteItemTypes, incompleteFieldTypes);
+ fldTypes[j] = (rtd.getNullableFields().get(j)) ? makeUnionWithNull(null, t2) : t2;
+ break;
+ }
+ case UNORDEREDLIST: {
+ UnorderedListTypeDefinition ultd = (UnorderedListTypeDefinition) texpr;
+ IAType t2 = computeUnorderedListType(null, ultd, typeMap, incompleteItemTypes, incompleteFieldTypes);
+ fldTypes[j] = (rtd.getNullableFields().get(j)) ? makeUnionWithNull(null, t2) : t2;
+ break;
+ }
+ default: {
+ throw new IllegalStateException();
+ }
+ }
+
+ }
+
+ return recType;
+ }
+
+ private AUnionType makeUnionWithNull(String unionTypeName, IAType type) {
+ ArrayList<IAType> unionList = new ArrayList<IAType>(2);
+ unionList.add(BuiltinType.ANULL);
+ unionList.add(type);
+ return new AUnionType(unionList, unionTypeName);
+ }
+
+ private void addIncompleteCollectionTypeReference(AbstractCollectionType collType, TypeReferenceExpression tre,
+ Map<String, List<AbstractCollectionType>> incompleteItemTypes) {
+ String typeName = tre.getIdent().getValue();
+ List<AbstractCollectionType> typeList = incompleteItemTypes.get(typeName);
+ if (typeList == null) {
+ typeList = new LinkedList<AbstractCollectionType>();
+ incompleteItemTypes.put(typeName, typeList);
+ }
+ typeList.add(collType);
+ }
+
+ private void addIncompleteFieldTypeReference(ARecordType recType, int fldPosition, TypeReferenceExpression tre,
+ Map<String, Map<ARecordType, List<Integer>>> incompleteFieldTypes) {
+ String typeName = tre.getIdent().getValue();
+ Map<ARecordType, List<Integer>> refMap = incompleteFieldTypes.get(typeName);
+ if (refMap == null) {
+ refMap = new HashMap<ARecordType, List<Integer>>();
+ incompleteFieldTypes.put(typeName, refMap);
+ }
+ List<Integer> typeList = refMap.get(recType);
+ if (typeList == null) {
+ typeList = new ArrayList<Integer>();
+ refMap.put(recType, typeList);
+ }
+ typeList.add(fldPosition);
+ }
+
+ private void addIncompleteTopLevelTypeReference(String tdeclName, TypeReferenceExpression tre,
+ Map<String, List<String>> incompleteTopLevelTypeReferences) {
+ String name = tre.getIdent().getValue();
+ List<String> refList = incompleteTopLevelTypeReferences.get(name);
+ if (refList == null) {
+ refList = new LinkedList<String>();
+ incompleteTopLevelTypeReferences.put(name, refList);
+ }
+ refList.add(tdeclName);
+ }
+
+ private IAType solveTypeReference(TypeReferenceExpression tre, Map<String, IAType> typeMap) {
+ String name = tre.getIdent().getValue();
+ IAType builtin = builtinTypeMap.get(name);
+ if (builtin != null) {
+ return builtin;
+ } else {
+ return typeMap.get(name);
+ }
+ }
+
+ public static interface ICompiledStatement {
+
+ public abstract Kind getKind();
+ }
+
+ public static class CompiledLoadFromFileStatement implements ICompiledStatement, IParseFileSplitsDecl {
+ private String datasetName;
+ private FileSplit[] splits;
+ private boolean alreadySorted;
+ private Character delimChar;
+
+ public CompiledLoadFromFileStatement(String datasetName, FileSplit[] splits, Character delimChar,
+ boolean alreadySorted) {
+ this.datasetName = datasetName;
+ this.splits = splits;
+ this.delimChar = delimChar;
+ this.alreadySorted = alreadySorted;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ @Override
+ public FileSplit[] getSplits() {
+ return splits;
+ }
+
+ @Override
+ public Character getDelimChar() {
+ return delimChar;
+ }
+
+ public boolean alreadySorted() {
+ return alreadySorted;
+ }
+
+ @Override
+ public boolean isDelimitedFileFormat() {
+ return delimChar != null;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.LOAD_FROM_FILE;
+ }
+ }
+
+ public static class CompiledWriteFromQueryResultStatement implements ICompiledStatement {
+
+ private String datasetName;
+ private Query query;
+ private int varCounter;
+
+ public CompiledWriteFromQueryResultStatement(String datasetName, Query query, int varCounter) {
+ this.datasetName = datasetName;
+ this.query = query;
+ this.varCounter = varCounter;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public int getVarCounter() {
+ return varCounter;
+ }
+
+ public Query getQuery() {
+ return query;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.WRITE_FROM_QUERY_RESULT;
+ }
+
+ }
+
+ public static class CompiledDatasetDropStatement implements ICompiledStatement {
+ private String datasetName;
+
+ public CompiledDatasetDropStatement(String datasetName) {
+ this.datasetName = datasetName;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.DATASET_DROP;
+ }
+ }
+
+ // added by yasser
+ public static class CompiledCreateDataverseStatement implements ICompiledStatement {
+ private String dataverseName;
+ private String format;
+
+ public CompiledCreateDataverseStatement(String dataverseName, String format) {
+ this.dataverseName = dataverseName;
+ this.format = format;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.CREATE_DATAVERSE;
+ }
+ }
+
+ public static class CompiledNodeGroupDropStatement implements ICompiledStatement {
+ private String nodeGroupName;
+
+ public CompiledNodeGroupDropStatement(String nodeGroupName) {
+ this.nodeGroupName = nodeGroupName;
+ }
+
+ public String getNodeGroupName() {
+ return nodeGroupName;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.NODEGROUP_DROP;
+ }
+ }
+
+ public static class CompiledIndexDropStatement implements ICompiledStatement {
+ private String datasetName;
+ private String indexName;
+
+ public CompiledIndexDropStatement(String datasetName, String indexName) {
+ this.datasetName = datasetName;
+ this.indexName = indexName;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.INDEX_DROP;
+ }
+ }
+
+ public static class CompiledDataverseDropStatement implements ICompiledStatement {
+ private String dataverseName;
+ private boolean ifExists;
+
+ public CompiledDataverseDropStatement(String dataverseName, boolean ifExists) {
+ this.dataverseName = dataverseName;
+ this.ifExists = ifExists;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public boolean getIfExists() {
+ return ifExists;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.DATAVERSE_DROP;
+ }
+ }
+
+ public static class CompiledTypeDropStatement implements ICompiledStatement {
+ private String typeName;
+
+ public CompiledTypeDropStatement(String nodeGroupName) {
+ this.typeName = nodeGroupName;
+ }
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.TYPE_DROP;
+ }
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixAppRuntimeContext.java
new file mode 100644
index 0000000..6320737
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixAppRuntimeContext.java
@@ -0,0 +1,122 @@
+package edu.uci.ics.asterix.context;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixRuntimeException;
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class AsterixAppRuntimeContext {
+ private static AsterixAppRuntimeContext INSTANCE;
+
+ private IndexRegistry<IIndex> treeRegistry;
+ private IBufferCache bufferCache;
+ private IFileMapManager fileMapManager;
+ private INCApplicationContext ncAppContext;
+
+ private static Logger LOGGER = Logger.getLogger(AsterixAppRuntimeContext.class.getName());
+
+ private AsterixAppRuntimeContext() {
+ }
+
+ public static void initialize(INCApplicationContext ncAppContext) throws IOException {
+ if (INSTANCE != null) {
+ LOGGER.info("Asterix instance already initialized");
+ return;
+ }
+
+ INSTANCE = new AsterixAppRuntimeContext();
+ INSTANCE.ncAppContext = ncAppContext;
+ INSTANCE.start();
+ }
+
+ public static void deinitialize() {
+ if (INSTANCE != null) {
+ INSTANCE.stop();
+ INSTANCE = null;
+ }
+ }
+
+ private void stop() {
+ bufferCache.close();
+ }
+
+ private void start() throws IOException {
+ fileMapManager = new AsterixFileMapManager();
+ ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+ IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
+ if (ncAppContext == null) {
+ throw new AsterixRuntimeException("NC Application Context has not been set.");
+ }
+ IIOManager ioMgr = ncAppContext.getRootContext().getIOManager();
+ String pgsizeStr = System.getProperty(GlobalConfig.BUFFER_CACHE_PAGE_SIZE_PROPERTY);
+ int pgSize = -1;
+ if (pgsizeStr != null) {
+ try {
+ pgSize = Integer.parseInt(pgsizeStr);
+ } catch (NumberFormatException nfe) {
+ StringWriter sw = new StringWriter();
+ nfe.printStackTrace(new PrintWriter(sw, true));
+ sw.close();
+ GlobalConfig.ASTERIX_LOGGER.warning("Wrong buffer cache page size argument. Picking frame size ("
+ + ncAppContext.getRootContext().getFrameSize() + ") instead. \n" + sw.toString() + "\n");
+ }
+ }
+ if (pgSize < 0) {
+ // by default, pick the frame size
+ pgSize = ncAppContext.getRootContext().getFrameSize();
+ }
+
+ int cacheSize = GlobalConfig.DEFAULT_BUFFER_CACHE_SIZE;
+ String cacheSizeStr = System.getProperty(GlobalConfig.BUFFER_CACHE_SIZE_PROPERTY);
+ if (cacheSizeStr != null) {
+ int cs = -1;
+ try {
+ cs = Integer.parseInt(cacheSizeStr);
+ } catch (NumberFormatException nfe) {
+ StringWriter sw = new StringWriter();
+ nfe.printStackTrace(new PrintWriter(sw, true));
+ sw.close();
+ GlobalConfig.ASTERIX_LOGGER.warning("Wrong buffer cache size argument. Picking default value ("
+ + GlobalConfig.DEFAULT_BUFFER_CACHE_SIZE + ") instead.\n");
+ }
+ if (cs >= 0) {
+ cacheSize = cs;
+ }
+ }
+ System.out.println("BC :" + pgSize + " cache " + cacheSize);
+ bufferCache = new BufferCache(ioMgr, allocator, prs, fileMapManager, pgSize, cacheSize, Integer.MAX_VALUE);
+ treeRegistry = new IndexRegistry<IIndex>();
+ }
+
+ public static AsterixAppRuntimeContext getInstance() {
+ return INSTANCE;
+ }
+
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+
+ public IFileMapProvider getFileMapManager() {
+ return fileMapManager;
+ }
+
+ public IndexRegistry<IIndex> getTreeRegistry() {
+ return treeRegistry;
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixFileMapManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixFileMapManager.java
new file mode 100644
index 0000000..0aacc3d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixFileMapManager.java
@@ -0,0 +1,61 @@
+package edu.uci.ics.asterix.context;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
+
+public class AsterixFileMapManager implements IFileMapManager {
+
+ private static final long serialVersionUID = 1L;
+ private Map<Integer, String> id2nameMap = new HashMap<Integer, String>();
+ private Map<String, Integer> name2IdMap = new HashMap<String, Integer>();
+ private int idCounter = 0;
+
+ @Override
+ public FileReference lookupFileName(int fileId) throws HyracksDataException {
+ String fName = id2nameMap.get(fileId);
+ if (fName == null) {
+ throw new HyracksDataException("No mapping found for id: " + fileId);
+ }
+ return new FileReference(new File(fName));
+ }
+
+ @Override
+ public int lookupFileId(FileReference fileRef) throws HyracksDataException {
+ String fileName = fileRef.getFile().getAbsolutePath();
+ Integer fileId = name2IdMap.get(fileName);
+ if (fileId == null) {
+ throw new HyracksDataException("No mapping found for name: " + fileName);
+ }
+ return fileId;
+ }
+
+ @Override
+ public boolean isMapped(FileReference fileRef) {
+ String fileName = fileRef.getFile().getAbsolutePath();
+ return name2IdMap.containsKey(fileName);
+ }
+
+ @Override
+ public boolean isMapped(int fileId) {
+ return id2nameMap.containsKey(fileId);
+ }
+
+ @Override
+ public void unregisterFile(int fileId) throws HyracksDataException {
+ String fileName = id2nameMap.remove(fileId);
+ name2IdMap.remove(fileName);
+ }
+
+ @Override
+ public void registerFile(FileReference fileRef) throws HyracksDataException {
+ Integer fileId = idCounter++;
+ String fileName = fileRef.getFile().getAbsolutePath();
+ id2nameMap.put(fileId, fileName);
+ name2IdMap.put(fileName, fileId);
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixStorageManagerInterface.java b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixStorageManagerInterface.java
new file mode 100644
index 0000000..12d715d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixStorageManagerInterface.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.context;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class AsterixStorageManagerInterface implements IStorageManagerInterface {
+ private static final long serialVersionUID = 1L;
+
+ public static AsterixStorageManagerInterface INSTANCE = new AsterixStorageManagerInterface();
+
+ @Override
+ public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
+ return AsterixAppRuntimeContext.getInstance().getBufferCache();
+ }
+
+ @Override
+ public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
+ return AsterixAppRuntimeContext.getInstance().getFileMapManager();
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixTreeRegistryProvider.java b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixTreeRegistryProvider.java
new file mode 100644
index 0000000..80fd64a
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/context/AsterixTreeRegistryProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.context;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+
+public class AsterixTreeRegistryProvider implements IIndexRegistryProvider<IIndex> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final AsterixTreeRegistryProvider INSTANCE = new AsterixTreeRegistryProvider();
+
+ private AsterixTreeRegistryProvider() {
+ }
+
+ @Override
+ public IndexRegistry<IIndex> getRegistry(IHyracksTaskContext ctx) {
+ return AsterixAppRuntimeContext.getInstance().getTreeRegistry();
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/drivers/AsterixClientDriver.java b/asterix-app/src/main/java/edu/uci/ics/asterix/drivers/AsterixClientDriver.java
new file mode 100644
index 0000000..db04380
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/drivers/AsterixClientDriver.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.asterix.drivers;
+
+import java.io.FileReader;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.asterix.api.common.AsterixClientConfig;
+import edu.uci.ics.asterix.api.java.AsterixJavaClient;
+
+public class AsterixClientDriver {
+
+ public static void main(String args[]) throws Exception {
+ AsterixClientConfig acc = new AsterixClientConfig();
+ CmdLineParser cmdParser = new CmdLineParser(acc);
+ try {
+ cmdParser.parseArgument(args);
+ } catch (Exception e) {
+ cmdParser.printUsage(System.err);
+ throw e;
+ }
+
+ if (acc.getArguments().isEmpty()) {
+ System.err.println("Please specify the file containing the query.");
+ return;
+ }
+ if (acc.getArguments().size() > 1) {
+ System.err.print("Too many arguments. ");
+ System.err.println("Only the file contained the query needs to be specified.");
+ return;
+ }
+ boolean exec = new Boolean(acc.execute);
+ AsterixJavaClient q = compileQuery(acc.getArguments().get(0), new Boolean(acc.optimize), new Boolean(
+ acc.onlyPhysical), exec || new Boolean(acc.hyracksJob));
+ if (exec) {
+ q.execute(acc.hyracksPort);
+ }
+ }
+
+ private static AsterixJavaClient compileQuery(String filename, boolean optimize, boolean onlyPhysical,
+ boolean createBinaryRuntime) throws Exception {
+ FileReader reader = new FileReader(filename);
+ AsterixJavaClient q = new AsterixJavaClient(reader);
+ q.compile(optimize, true, true, true, onlyPhysical, createBinaryRuntime, createBinaryRuntime);
+ return q;
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/drivers/AsterixWebServer.java b/asterix-app/src/main/java/edu/uci/ics/asterix/drivers/AsterixWebServer.java
new file mode 100644
index 0000000..c8d6967
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/drivers/AsterixWebServer.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.asterix.drivers;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import edu.uci.ics.asterix.api.http.servlet.APIServlet;
+
+public class AsterixWebServer {
+ public static void main(String args[]) throws Exception {
+ Server server = new Server(8080);
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ server.setHandler(context);
+
+ context.addServlet(new ServletHolder(new APIServlet()), "/*");
+ server.start();
+ server.join();
+ }
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
new file mode 100644
index 0000000..d179eb6
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.common.Job;
+import edu.uci.ics.asterix.aql.translator.DdlTranslator.CompiledDatasetDropStatement;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.std.NoTupleSourceRuntimeFactory;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledLoadFromFileStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class DatasetOperations {
+
+ private static final PhysicalOptimizationConfig physicalOptimizationConfig = OptimizationConfUtil
+ .getPhysicalOptimizationConfig();
+
+ private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
+
+ public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement deleteStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AlgebricksException, HyracksDataException,
+ RemoteException, ACIDException, AsterixException {
+
+ String datasetName = deleteStmt.getDatasetName();
+ String datasetPath = metadata.getRelativePath(datasetName);
+
+ LOGGER.info("DROP DATASETPATH: " + datasetPath);
+
+ IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
+ if (adecl == null) {
+ throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
+ }
+ if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+ return new JobSpecification[0];
+ }
+
+ List<AqlCompiledIndexDecl> secondaryIndexes = DatasetUtils.getSecondaryIndexes(adecl);
+
+ JobSpecification[] specs;
+
+ if (secondaryIndexes != null && !secondaryIndexes.isEmpty()) {
+ int n = secondaryIndexes.size();
+ specs = new JobSpecification[n + 1];
+ int i = 0;
+ // first, drop indexes
+ for (AqlCompiledIndexDecl acid : secondaryIndexes) {
+ specs[i] = new JobSpecification();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, acid.getIndexName());
+ TreeIndexDropOperatorDescriptor secondaryBtreeDrop = new TreeIndexDropOperatorDescriptor(specs[i],
+ storageManager, btreeRegistryProvider, idxSplitsAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
+ idxSplitsAndConstraint.second);
+ i++;
+ }
+ } else {
+ specs = new JobSpecification[1];
+ }
+ JobSpecification specPrimary = new JobSpecification();
+ specs[specs.length - 1] = specPrimary;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+ TreeIndexDropOperatorDescriptor primaryBtreeDrop = new TreeIndexDropOperatorDescriptor(specPrimary,
+ storageManager, btreeRegistryProvider, splitsAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
+ splitsAndConstraint.second);
+
+ specPrimary.addRoot(primaryBtreeDrop);
+
+ return specs;
+ }
+
+ public static JobSpecification[] createInitializeDatasetJobSpec(long txnId, String datasetName,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException {
+
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AsterixException("Could not find dataset " + datasetName);
+ }
+ if (compiledDatasetDecl.getDatasetType() != DatasetType.INTERNAL
+ && compiledDatasetDecl.getDatasetType() != DatasetType.FEED) {
+ throw new AsterixException("Cannot initialize dataset (" + datasetName + ")" + "of type "
+ + compiledDatasetDecl.getDatasetType());
+ }
+
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ IDataFormat format;
+ ISerializerDeserializer payloadSerde;
+ IBinaryComparatorFactory[] comparatorFactories;
+ ITypeTraits[] typeTraits;
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+
+ try {
+ format = metadata.getFormat();
+ payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(compiledDatasetDecl, metadata
+ .getFormat().getBinaryComparatorFactoryProvider());
+ typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+ splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+ datasetName);
+
+ } catch (AlgebricksException e1) {
+ throw new AsterixException(e1);
+ }
+
+ ITreeIndexFrameFactory interiorFrameFactory = AqlMetadataProvider.createBTreeNSMInteriorFrameFactory(typeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = AqlMetadataProvider.createBTreeNSMLeafFrameFactory(typeTraits);
+
+ IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ JobSpecification spec = new JobSpecification();
+ RecordDescriptor recDesc;
+ try {
+ recDesc = computePayloadKeyRecordDescriptor(compiledDatasetDecl, payloadSerde, metadata.getFormat());
+ NoTupleSourceRuntimeFactory factory = new NoTupleSourceRuntimeFactory();
+ AlgebricksMetaOperatorDescriptor asterixOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+ new IPushRuntimeFactory[] { factory }, new RecordDescriptor[] { recDesc });
+
+ // move key fieldsx to front
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int numKeys = partitioningFunctions.size();
+ int[] keys = new int[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ keys[i] = i + 1;
+ }
+
+ int[] fieldPermutation = new int[numKeys + 1];
+ System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ fieldPermutation[numKeys] = 0;
+
+ TreeIndexBulkLoadOperatorDescriptor bulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManager, btreeRegistryProvider, splitsAndConstraint.first, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories, fieldPermutation,
+ GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixOp,
+ splitsAndConstraint.second);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, bulkLoad,
+ splitsAndConstraint.second);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixOp, 0, bulkLoad, 0);
+
+ spec.addRoot(bulkLoad);
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+
+ return new JobSpecification[] { spec };
+ }
+
+ @SuppressWarnings("unchecked")
+ public static List<Job> createLoadDatasetJobSpec(CompiledLoadFromFileStatement loadStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException {
+
+ String datasetName = loadStmt.getDatasetName();
+
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AsterixException("Could not find dataset " + datasetName);
+ }
+ if (compiledDatasetDecl.getDatasetType() != DatasetType.INTERNAL
+ && compiledDatasetDecl.getDatasetType() != DatasetType.FEED) {
+ throw new AsterixException("Cannot load data into dataset (" + datasetName + ")" + "of type "
+ + compiledDatasetDecl.getDatasetType());
+ }
+
+ List<Job> jobSpecs = new ArrayList<Job>();
+ try {
+ jobSpecs.addAll(dropDatasetIndexes(datasetName, metadata));
+ } catch (AlgebricksException ae) {
+ throw new AsterixException(ae);
+ }
+
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ IDataFormat format;
+ try {
+ format = metadata.getFormat();
+ } catch (AlgebricksException e1) {
+ throw new AsterixException(e1);
+ }
+ ISerializerDeserializer payloadSerde;
+ try {
+ payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+
+ IBinaryHashFunctionFactory[] hashFactories;
+ IBinaryComparatorFactory[] comparatorFactories;
+ ITypeTraits[] typeTraits;
+ try {
+ hashFactories = DatasetUtils.computeKeysBinaryHashFunFactories(compiledDatasetDecl, metadata.getFormat()
+ .getBinaryHashFunctionFactoryProvider());
+ comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(compiledDatasetDecl, metadata
+ .getFormat().getBinaryComparatorFactoryProvider());
+ typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+
+ JobSpecification spec = new JobSpecification();
+ IOperatorDescriptor scanner;
+ AlgebricksPartitionConstraint scannerPc;
+ RecordDescriptor recDesc;
+ try {
+ AqlCompiledExternalDatasetDetails add = new AqlCompiledExternalDatasetDetails(loadStmt.getAdapter(),
+ loadStmt.getProperties());
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = AqlMetadataProvider
+ .buildExternalDataScannerRuntime(spec, itemType, add, format);
+ scanner = p.first;
+ scannerPc = p.second;
+ recDesc = computePayloadKeyRecordDescriptor(compiledDatasetDecl, payloadSerde, metadata.getFormat());
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, scanner, scannerPc);
+
+ AssignRuntimeFactory assign = makeAssignRuntimeFactory(compiledDatasetDecl);
+ AlgebricksMetaOperatorDescriptor asterixOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { recDesc });
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixOp, scannerPc);
+
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int numKeys = partitioningFunctions.size();
+ int[] keys = new int[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ keys[i] = i + 1;
+ }
+ int framesLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
+
+ ITreeIndexFrameFactory interiorFrameFactory = AqlMetadataProvider.createBTreeNSMInteriorFrameFactory(typeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = AqlMetadataProvider.createBTreeNSMLeafFrameFactory(typeTraits);
+
+ IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ // move key fields to front
+ int[] fieldPermutation = new int[numKeys + 1];
+ System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ fieldPermutation[numKeys] = 0;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+ datasetName);
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+
+ FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < fs.length; i++) {
+ sb.append(stringOf(fs[i]) + " ");
+ }
+ LOGGER.info("LOAD into File Splits: " + sb.toString());
+
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManager, btreeRegistryProvider, splitsAndConstraint.first, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories, fieldPermutation,
+ GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
+ splitsAndConstraint.second);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, asterixOp, 0);
+
+ if (!loadStmt.alreadySorted()) {
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
+ comparatorFactories, recDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sorter,
+ splitsAndConstraint.second);
+
+ IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, hashFactories));
+
+ spec.connect(hashConn, asterixOp, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ } else {
+ IConnectorDescriptor sortMergeConn = new MToNPartitioningMergingConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys, hashFactories), keys, comparatorFactories);
+ spec.connect(sortMergeConn, asterixOp, 0, btreeBulkLoad, 0);
+ }
+
+ spec.addRoot(btreeBulkLoad);
+
+ jobSpecs.add(new Job(spec));
+ return jobSpecs;
+ }
+
+ private static List<Job> dropDatasetIndexes(String datasetName, AqlCompiledMetadataDeclarations metadata)
+ throws AlgebricksException, MetadataException {
+
+ AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
+ if (adecl == null) {
+ throw new AlgebricksException("DROP DATASET INDEXES: No metadata for dataset " + datasetName);
+ }
+
+ List<AqlCompiledIndexDecl> indexes = DatasetUtils.getSecondaryIndexes(adecl);
+ indexes.add(DatasetUtils.getPrimaryIndex(adecl));
+
+ List<Job> specs = new ArrayList<Job>();
+ IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ if (indexes != null && !indexes.isEmpty()) {
+ // first, drop indexes
+ for (AqlCompiledIndexDecl acid : indexes) {
+ JobSpecification spec = new JobSpecification();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, acid.getIndexName());
+ TreeIndexDropOperatorDescriptor secondaryBtreeDrop = new TreeIndexDropOperatorDescriptor(spec,
+ storageManager, btreeRegistryProvider, idxSplitsAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBtreeDrop,
+ idxSplitsAndConstraint.second);
+ specs.add(new Job(spec));
+ }
+ }
+ return specs;
+ }
+
+ private static String stringOf(FileSplit fs) {
+ return fs.getNodeName() + ":" + fs.getLocalFile().toString();
+ }
+
+ private static AssignRuntimeFactory makeAssignRuntimeFactory(AqlCompiledDatasetDecl compiledDatasetDecl) {
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int numKeys = partitioningFunctions.size();
+ IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
+ .get(i);
+ evalFactories[i] = evalFactoryAndType.first;
+ }
+ int[] outColumns = new int[numKeys];
+ int[] projectionList = new int[numKeys + 1];
+ projectionList[0] = 0;
+
+ for (int i = 0; i < numKeys; i++) {
+ outColumns[i] = i + 1;
+ projectionList[i + 1] = i + 1;
+ }
+ return new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static RecordDescriptor computePayloadKeyRecordDescriptor(AqlCompiledDatasetDecl compiledDatasetDecl,
+ ISerializerDeserializer payloadSerde, IDataFormat dataFormat) throws AlgebricksException {
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int numKeys = partitioningFunctions.size();
+ ISerializerDeserializer[] recordFields = new ISerializerDeserializer[1 + numKeys];
+ recordFields[0] = payloadSerde;
+ for (int i = 0; i < numKeys; i++) {
+ Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
+ .get(i);
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = dataFormat.getSerdeProvider().getSerializerDeserializer(keyType);
+ recordFields[i + 1] = keySerde;
+ }
+ return new RecordDescriptor(recordFields);
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
new file mode 100644
index 0000000..ef2425d
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.file;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.feed.comm.AlterFeedMessage;
+import edu.uci.ics.asterix.feed.comm.FeedMessage;
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+import edu.uci.ics.asterix.feed.comm.IFeedMessage.MessageType;
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledFeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledControlFeedStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+
+public class FeedOperations {
+
+ private static final Logger LOGGER = Logger.getLogger(IndexOperations.class.getName());
+
+ public static JobSpecification buildControlFeedJobSpec(CompiledControlFeedStatement controlFeedStatement,
+ AqlCompiledMetadataDeclarations datasetDecls) throws AsterixException, AlgebricksException {
+ switch (controlFeedStatement.getOperationType()) {
+ case ALTER:
+ case SUSPEND:
+ case RESUME:
+ case END: {
+ return createSendMessageToFeedJobSpec(controlFeedStatement, datasetDecls);
+ }
+ default: {
+ throw new AsterixException("Unknown Operation Type: " + controlFeedStatement.getOperationType());
+ }
+
+ }
+ }
+
+ private static JobSpecification createSendMessageToFeedJobSpec(CompiledControlFeedStatement controlFeedStatement,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException {
+ String datasetName = controlFeedStatement.getDatasetName().getValue();
+ String datasetPath = metadata.getRelativePath(datasetName);
+
+ LOGGER.info(" DATASETPATH: " + datasetPath);
+
+ AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
+ if (adecl == null) {
+ throw new AsterixException("FEED DATASET: No metadata for dataset " + datasetName);
+ }
+ if (adecl.getDatasetType() != DatasetType.FEED) {
+ throw new AsterixException("Operation not support for dataset type " + adecl.getDatasetType());
+ }
+
+ JobSpecification spec = new JobSpecification();
+ IOperatorDescriptor feedMessenger;
+ AlgebricksPartitionConstraint messengerPc;
+
+ List<IFeedMessage> feedMessages = new ArrayList<IFeedMessage>();
+ switch (controlFeedStatement.getOperationType()) {
+ case SUSPEND:
+ feedMessages.add(new FeedMessage(MessageType.SUSPEND));
+ break;
+ case END:
+ feedMessages.add(new FeedMessage(MessageType.STOP));
+ break;
+ case RESUME:
+ feedMessages.add(new FeedMessage(MessageType.RESUME));
+ break;
+ case ALTER:
+ feedMessages.add(new AlterFeedMessage(controlFeedStatement.getProperties()));
+ break;
+ }
+
+ try {
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = AqlMetadataProvider.buildFeedMessengerRuntime(
+ spec, metadata, (AqlCompiledFeedDatasetDetails) adecl.getAqlCompiledDatasetDetails(),
+ metadata.getDataverseName(), datasetName, feedMessages);
+ feedMessenger = p.first;
+ messengerPc = p.second;
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+
+ spec.addRoot(nullSink);
+ return spec;
+
+ }
+
+ private static AssignRuntimeFactory makeAssignRuntimeFactory(AqlCompiledDatasetDecl compiledDatasetDecl) {
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int numKeys = partitioningFunctions.size();
+ IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numKeys];
+
+ int index = 0;
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+ evalFactories[index++] = evalFactoryAndType.first;
+ }
+
+ int[] outColumns = new int[numKeys];
+ int[] projectionList = new int[numKeys + 1];
+ projectionList[0] = 0;
+
+ for (int i = 0; i < numKeys; i++) {
+ outColumns[i] = i + 1;
+ projectionList[i + 1] = i + 1;
+ }
+ return new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static RecordDescriptor computePayloadKeyRecordDescriptor(AqlCompiledDatasetDecl compiledDatasetDecl,
+ ISerializerDeserializer payloadSerde, IDataFormat dataFormat) throws AlgebricksException {
+
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl);
+ int numKeys = partitioningFunctions.size();
+ ISerializerDeserializer[] recordFields = new ISerializerDeserializer[1 + numKeys];
+ recordFields[0] = payloadSerde;
+ int index = 0;
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = dataFormat.getSerdeProvider().getSerializerDeserializer(keyType);
+ recordFields[index + 1] = keySerde;
+ index++;
+ }
+ return new RecordDescriptor(recordFields);
+ }
+
+ private static String stringOf(FileSplit fs) {
+ return fs.getNodeName() + ":" + fs.getLocalFile().toString();
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
new file mode 100644
index 0000000..f9ac244
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -0,0 +1,860 @@
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.aql.translator.DdlTranslator.CompiledIndexDropStatement;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
+import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryTokenizerFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
+import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class IndexOperations {
+
+ private static final PhysicalOptimizationConfig physicalOptimizationConfig = OptimizationConfUtil
+ .getPhysicalOptimizationConfig();
+
+ private static final Logger LOGGER = Logger.getLogger(IndexOperations.class.getName());
+
+ public static JobSpecification buildCreateIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations datasetDecls) throws AsterixException, AlgebricksException {
+
+ switch (createIndexStmt.getIndexType()) {
+ case BTREE: {
+ return createBtreeIndexJobSpec(createIndexStmt, datasetDecls);
+ }
+
+ case RTREE: {
+ return createRtreeIndexJobSpec(createIndexStmt, datasetDecls);
+ }
+
+ case KEYWORD: {
+ return createKeywordIndexJobSpec(createIndexStmt, datasetDecls);
+ }
+
+ case QGRAM: {
+ // return createQgramIndexJobSpec(createIndexStmt,
+ // datasetDecls);
+ }
+
+ default: {
+ throw new AsterixException("Unknown Index Type: " + createIndexStmt.getIndexType());
+ }
+
+ }
+ }
+
+ public static JobSpecification createSecondaryIndexDropJobSpec(CompiledIndexDropStatement deleteStmt,
+ AqlCompiledMetadataDeclarations datasetDecls) throws AlgebricksException, MetadataException {
+ String datasetName = deleteStmt.getDatasetName();
+ String indexName = deleteStmt.getIndexName();
+
+ JobSpecification spec = new JobSpecification();
+ IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = datasetDecls
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+ TreeIndexDropOperatorDescriptor btreeDrop = new TreeIndexDropOperatorDescriptor(spec, storageManager,
+ btreeRegistryProvider, splitsAndConstraint.first);
+ AlgebricksPartitionConstraintHelper
+ .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
+ spec.addRoot(btreeDrop);
+
+ return spec;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static JobSpecification createBtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+
+ JobSpecification spec = new JobSpecification();
+
+ String datasetName = createIndexStmt.getDatasetName();
+ String secondaryIndexName = createIndexStmt.getIndexName();
+
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ ISerializerDeserializer payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(itemType);
+
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
+ }
+
+ AqlCompiledDatasetDecl srcCompiledDatasetDecl = compiledDatasetDecl;
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+ // ---------- START GENERAL BTREE STUFF
+ IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ // ---------- END GENERAL BTREE STUFF
+
+ // ---------- START KEY PROVIDER OP
+
+ // TODO: should actually be empty tuple source
+ // build tuple containing low and high search keys
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
+ DataOutput dos = tb.getDataOutput();
+
+ tb.reset();
+ try {
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ } // dummy field
+ tb.addFieldEndOffset();
+
+ ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+ keyProviderSplitsAndConstraint.second);
+
+ // ---------- END KEY PROVIDER OP
+
+ // ---------- START PRIMARY INDEX SCAN
+
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ int i = 0;
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+ List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
+ .getPartitioningFunctions(srcCompiledDatasetDecl);
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : partitioningFunctions) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ primaryRecFields[i] = keySerde;
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+
+ ITreeIndexFrameFactory primaryInteriorFrameFactory = AqlMetadataProvider
+ .createBTreeNSMInteriorFrameFactory(primaryTypeTraits);
+ ITreeIndexFrameFactory primaryLeafFrameFactory = AqlMetadataProvider
+ .createBTreeNSMLeafFrameFactory(primaryTypeTraits);
+
+ int[] lowKeyFields = null; // -infinity
+ int[] highKeyFields = null; // +infinity
+ RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+ storageManager, treeRegistryProvider, primarySplitsAndConstraint.first, primaryInteriorFrameFactory,
+ primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
+ highKeyFields, true, true, new BTreeDataflowHelperFactory());
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+ primarySplitsAndConstraint.second);
+
+ // ---------- END PRIMARY INDEX SCAN
+
+ // ---------- START ASSIGN OP
+
+ List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+ int numSecondaryKeys = secondaryKeyFields.size();
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+ IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+ IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys
+ + numPrimaryKeys];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
+ numPrimaryKeys);
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ // fill in serializers and comparators for primary index fields
+ for (i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
+ secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryTypeTraits[i];
+ }
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+ int[] outColumns = new int[numSecondaryKeys];
+ int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ outColumns[i] = numPrimaryKeys + i + 1;
+ }
+ int projCount = 0;
+ for (i = 0; i < numSecondaryKeys; i++) {
+ projectionList[projCount++] = numPrimaryKeys + i + 1;
+ }
+ for (i = 0; i < numPrimaryKeys; i++) {
+ projectionList[projCount++] = i;
+ }
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ assignSplitsAndConstraint.second);
+
+ // ---------- END ASSIGN OP
+
+ // ---------- START EXTERNAL SORT OP
+
+ int[] sortFields = new int[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+ sortFields[i] = i;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
+
+ ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+ physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories,
+ secondaryRecDesc);
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
+ sorterSplitsAndConstraint.second);
+
+ // ---------- END EXTERNAL SORT OP
+
+ // ---------- START SECONDARY INDEX BULK LOAD
+
+ ITreeIndexFrameFactory secondaryInteriorFrameFactory = AqlMetadataProvider
+ .createBTreeNSMInteriorFrameFactory(secondaryTypeTraits);
+ ITreeIndexFrameFactory secondaryLeafFrameFactory = AqlMetadataProvider
+ .createBTreeNSMLeafFrameFactory(secondaryTypeTraits);
+
+ int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numSecondaryKeys + numPrimaryKeys; i++)
+ fieldPermutation[i] = i;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, secondaryIndexName);
+
+ // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManager, treeRegistryProvider, secondarySplitsAndConstraint.first,
+ secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
+ secondaryComparatorFactories, fieldPermutation, 0.7f, new BTreeDataflowHelperFactory());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END SECONDARY INDEX BULK LOAD
+
+ // ---------- START CONNECT THE OPERATORS
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+
+ spec.addRoot(secondaryBulkLoadOp);
+
+ // ---------- END CONNECT THE OPERATORS
+
+ return spec;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public static JobSpecification createRtreeIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+
+ JobSpecification spec = new JobSpecification();
+
+ String primaryIndexName = createIndexStmt.getDatasetName();
+ String secondaryIndexName = createIndexStmt.getIndexName();
+
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(primaryIndexName);
+ if (compiledDatasetDecl == null) {
+ throw new AsterixException("Could not find dataset " + primaryIndexName);
+ }
+ ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
+ ISerializerDeserializerProvider serdeProvider = metadata.getFormat().getSerdeProvider();
+ ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
+
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
+ }
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+ // ---------- START GENERAL BTREE STUFF
+
+ IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ // ---------- END GENERAL BTREE STUFF
+
+ // ---------- START KEY PROVIDER OP
+
+ // TODO: should actually be empty tuple source
+ // build tuple containing low and high search keys
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
+ DataOutput dos = tb.getDataOutput();
+
+ tb.reset();
+ try {
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ } // dummy field
+ tb.addFieldEndOffset();
+
+ ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+ keyProviderSplitsAndConstraint.second);
+
+ // ---------- END KEY PROVIDER OP
+
+ // ---------- START PRIMARY INDEX SCAN
+
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ int i = 0;
+ serdeProvider = metadata.getFormat().getSerdeProvider();
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ primaryRecFields[i] = keySerde;
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+
+ ITreeIndexFrameFactory primaryInteriorFrameFactory = AqlMetadataProvider
+ .createBTreeNSMInteriorFrameFactory(primaryTypeTraits);
+ ITreeIndexFrameFactory primaryLeafFrameFactory = AqlMetadataProvider
+ .createBTreeNSMLeafFrameFactory(primaryTypeTraits);
+
+ int[] lowKeyFields = null; // -infinity
+ int[] highKeyFields = null; // +infinity
+ RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+ storageManager, treeRegistryProvider, primarySplitsAndConstraint.first, primaryInteriorFrameFactory,
+ primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
+ highKeyFields, true, true, new BTreeDataflowHelperFactory());
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+ primarySplitsAndConstraint.second);
+
+ // ---------- END PRIMARY INDEX SCAN
+
+ // ---------- START ASSIGN OP
+
+ List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+ int numSecondaryKeys = secondaryKeyFields.size();
+
+ if (numSecondaryKeys != 1) {
+ throw new AsterixException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ }
+
+ IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+ if (spatialType == null) {
+ throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+ }
+
+ int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ int numNestedSecondaryKeyFields = dimension * 2;
+
+ IEvaluatorFactory[] evalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
+ numPrimaryKeys, dimension);
+
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ + numNestedSecondaryKeyFields];
+ IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+ IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+ for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, OrderKind.ASC);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+
+ // fill in serializers and comparators for primary index fields
+ for (i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecFields[i];
+ secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryTypeTraits[i];
+ }
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+ int[] outColumns = new int[numNestedSecondaryKeyFields];
+ int[] projectionList = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+ for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ outColumns[i] = numPrimaryKeys + i + 1;
+ }
+ int projCount = 0;
+ for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ projectionList[projCount++] = numPrimaryKeys + i + 1;
+ }
+ for (i = 0; i < numPrimaryKeys; i++) {
+ projectionList[projCount++] = i;
+ }
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ assignSplitsAndConstraint.second);
+
+ // ---------- END ASSIGN OP
+
+ // ---------- START SECONDARY INDEX BULK LOAD
+
+ /*
+ ITreeIndexFrameFactory secondaryInteriorFrameFactory = JobGenHelper.createRTreeNSMInteriorFrameFactory(
+ secondaryTypeTraits, numNestedSecondaryKeyFields);
+ ITreeIndexFrameFactory secondaryLeafFrameFactory = JobGenHelper.createRTreeNSMLeafFrameFactory(
+ secondaryTypeTraits, numNestedSecondaryKeyFields);
+ */
+
+ ITreeIndexFrameFactory secondaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(
+ new RTreeTypeAwareTupleWriterFactory(secondaryTypeTraits), valueProviderFactories);
+ ITreeIndexFrameFactory secondaryLeafFrameFactory = new RTreeNSMLeafFrameFactory(
+ new RTreeTypeAwareTupleWriterFactory(secondaryTypeTraits), valueProviderFactories);
+
+ int[] fieldPermutation = new int[numNestedSecondaryKeyFields + numPrimaryKeys];
+ for (i = 0; i < numNestedSecondaryKeyFields + numPrimaryKeys; i++)
+ fieldPermutation[i] = i;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
+
+ // GlobalConfig.DEFAULT_BTREE_FILL_FACTOR
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManager, treeRegistryProvider, secondarySplitsAndConstraint.first,
+ secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
+ secondaryComparatorFactories, fieldPermutation, 0.7f, new RTreeDataflowHelperFactory());
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END SECONDARY INDEX BULK LOAD
+
+ // ---------- START CONNECT THE OPERATORS
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+
+ spec.addRoot(secondaryBulkLoadOp);
+
+ // ---------- END CONNECT THE OPERATORS
+
+ return spec;
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public static JobSpecification createKeywordIndexJobSpec(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations datasetDecls) throws AsterixException, AlgebricksException {
+
+ JobSpecification spec = new JobSpecification();
+
+ String primaryIndexName = createIndexStmt.getDatasetName();
+ String secondaryIndexName = createIndexStmt.getIndexName();
+
+ AqlCompiledDatasetDecl compiledDatasetDecl = datasetDecls.findDataset(primaryIndexName);
+ if (compiledDatasetDecl == null) {
+ throw new AsterixException("Could not find dataset " + primaryIndexName);
+ }
+
+ if (compiledDatasetDecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AsterixException("Cannot index an external dataset (" + primaryIndexName + ").");
+ }
+ ARecordType itemType = (ARecordType) datasetDecls.findType(compiledDatasetDecl.getItemTypeName());
+ ISerializerDeserializerProvider serdeProvider = datasetDecls.getFormat().getSerdeProvider();
+ ISerializerDeserializer payloadSerde = serdeProvider.getSerializerDeserializer(itemType);
+
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(compiledDatasetDecl).size();
+
+ // sanity
+ if (numPrimaryKeys > 1)
+ throw new AsterixException("Cannot create inverted keyword index on dataset with composite primary key.");
+
+ // sanity
+ IAType fieldsToTokenizeType = AqlCompiledIndexDecl
+ .keyFieldType(createIndexStmt.getKeyFields().get(0), itemType);
+ for (String fieldName : createIndexStmt.getKeyFields()) {
+ IAType nextFieldToTokenizeType = AqlCompiledIndexDecl.keyFieldType(fieldName, itemType);
+ if (nextFieldToTokenizeType.getTypeTag() != fieldsToTokenizeType.getTypeTag()) {
+ throw new AsterixException(
+ "Cannot create inverted keyword index. Fields to tokenize must be of the same type.");
+ }
+ }
+
+ // ---------- START GENERAL BTREE STUFF
+
+ IIndexRegistryProvider<IIndex> treeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ // ---------- END GENERAL BTREE STUFF
+
+ // ---------- START KEY PROVIDER OP
+
+ // TODO: should actually be empty tuple source
+ // build tuple containing low and high search keys
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
+ DataOutput dos = tb.getDataOutput();
+
+ try {
+ tb.reset();
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); // dummy
+ // field
+ tb.addFieldEndOffset();
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
+
+ ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> keyProviderSplitsAndConstraint = datasetDecls
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+ keyProviderSplitsAndConstraint.second);
+
+ // ---------- END KEY PROVIDER OP
+
+ // ---------- START PRIMARY INDEX SCAN
+
+ ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ int i = 0;
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ primaryRecFields[i] = keySerde;
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, OrderKind.ASC);
+ primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
+ primaryRecFields[numPrimaryKeys] = payloadSerde;
+ primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+
+ ITreeIndexFrameFactory primaryInteriorFrameFactory = AqlMetadataProvider
+ .createBTreeNSMInteriorFrameFactory(primaryTypeTraits);
+ ITreeIndexFrameFactory primaryLeafFrameFactory = AqlMetadataProvider
+ .createBTreeNSMLeafFrameFactory(primaryTypeTraits);
+
+ int[] lowKeyFields = null; // -infinity
+ int[] highKeyFields = null; // +infinity
+ RecordDescriptor primaryRecDesc = new RecordDescriptor(primaryRecFields);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = datasetDecls
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+ storageManager, treeRegistryProvider, primarySplitsAndConstraint.first, primaryInteriorFrameFactory,
+ primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
+ highKeyFields, true, true, new BTreeDataflowHelperFactory());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+ primarySplitsAndConstraint.second);
+
+ // ---------- END PRIMARY INDEX SCAN
+
+ // ---------- START ASSIGN OP
+
+ List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
+ int numSecondaryKeys = secondaryKeyFields.size();
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+ IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ evalFactories[i] = datasetDecls.getFormat().getFieldAccessEvaluatorFactory(itemType,
+ secondaryKeyFields.get(i), numPrimaryKeys);
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ }
+ // fill in serializers and comparators for primary index fields
+ for (i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecFields[i];
+ }
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+
+ int[] outColumns = new int[numSecondaryKeys];
+ int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ outColumns[i] = numPrimaryKeys + i + 1;
+ }
+ int projCount = 0;
+ for (i = 0; i < numSecondaryKeys; i++) {
+ projectionList[projCount++] = numPrimaryKeys + i + 1;
+ }
+ for (i = 0; i < numPrimaryKeys; i++) {
+ projectionList[projCount++] = i;
+ }
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> assignSplitsAndConstraint = datasetDecls
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ assignSplitsAndConstraint.second);
+
+ // ---------- END ASSIGN OP
+
+ // ---------- START TOKENIZER OP
+
+ int numTokenKeyPairFields = numPrimaryKeys + 1;
+
+ ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
+ tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(fieldsToTokenizeType);
+ for (i = 0; i < numPrimaryKeys; i++)
+ tokenKeyPairFields[i + 1] = secondaryRecFields[numSecondaryKeys + i];
+ RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields);
+
+ int[] fieldsToTokenize = new int[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++)
+ fieldsToTokenize[i] = i;
+
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ for (i = 0; i < numPrimaryKeys; i++)
+ primaryKeyFields[i] = numSecondaryKeys + i;
+
+ IBinaryTokenizerFactory tokenizerFactory = AqlBinaryTokenizerFactoryProvider.INSTANCE
+ .getTokenizerFactory(fieldsToTokenizeType);
+ BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
+ tokenKeyPairRecDesc, tokenizerFactory, fieldsToTokenize, primaryKeyFields);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = datasetDecls
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, secondaryIndexName);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END TOKENIZER OP
+
+ // ---------- START EXTERNAL SORT OP
+
+ IBinaryComparatorFactory[] tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
+ tokenKeyPairComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ fieldsToTokenizeType, OrderKind.ASC);
+ for (i = 0; i < numPrimaryKeys; i++)
+ tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
+
+ int[] sortFields = new int[numTokenKeyPairFields]; // <token, primary
+ // key a, primary
+ // key b, etc.>
+ for (i = 0; i < numTokenKeyPairFields; i++)
+ sortFields[i] = i;
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> sorterSplitsAndConstraint = datasetDecls
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(primaryIndexName, primaryIndexName);
+
+ ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+ physicalOptimizationConfig.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories,
+ secondaryRecDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
+ sorterSplitsAndConstraint.second);
+
+ // ---------- END EXTERNAL SORT OP
+
+ // ---------- START SECONDARY INDEX BULK LOAD
+
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+ secondaryTypeTraits[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(fieldsToTokenizeType);
+ for (i = 0; i < numPrimaryKeys; i++)
+ secondaryTypeTraits[i + 1] = primaryTypeTraits[i];
+
+ ITreeIndexFrameFactory secondaryInteriorFrameFactory = AqlMetadataProvider
+ .createBTreeNSMInteriorFrameFactory(secondaryTypeTraits);
+ ITreeIndexFrameFactory secondaryLeafFrameFactory = AqlMetadataProvider
+ .createBTreeNSMLeafFrameFactory(secondaryTypeTraits);
+
+ int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
+ for (i = 0; i < numTokenKeyPairFields; i++)
+ fieldPermutation[i] = i;
+
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManager, treeRegistryProvider, secondarySplitsAndConstraint.first,
+ secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
+ tokenKeyPairComparatorFactories, fieldPermutation, 0.7f, new BTreeDataflowHelperFactory());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBulkLoadOp,
+ secondarySplitsAndConstraint.second);
+
+ // ---------- END SECONDARY INDEX BULK LOAD
+
+ // ---------- START CONNECT THE OPERATORS
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, asterixAssignOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
+
+ spec.addRoot(secondaryBulkLoadOp);
+
+ // ---------- END CONNECT THE OPERATORS
+
+ return spec;
+ }
+
+ public static void main(String[] args) throws Exception {
+ String host;
+ String appName;
+ String ddlFile;
+
+ switch (args.length) {
+ case 0: {
+ host = "127.0.0.1";
+ appName = "asterix";
+ ddlFile = "/home/abehm/workspace/asterix/asterix-app/src/test/resources/demo0927/local/create-index.aql";
+ System.out.println("No arguments specified, using defauls:");
+ System.out.println("HYRACKS HOST: " + host);
+ System.out.println("APPNAME: " + appName);
+ System.out.println("DDLFILE: " + ddlFile);
+ }
+ break;
+
+ case 3: {
+ host = args[0];
+ appName = args[1];
+ ddlFile = args[2];
+ }
+ break;
+
+ default: {
+ System.out.println("USAGE:");
+ System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
+ System.out.println("ARG 2: Application Name (e.g., asterix)");
+ System.out.println("ARG 3: DDL File");
+ host = null;
+ appName = null;
+ ddlFile = null;
+ System.exit(0);
+ }
+ break;
+
+ }
+
+ // int port = HyracksIntegrationUtil.DEFAULT_HYRACKS_CC_PORT;
+
+ // AsterixJavaClient q = compileQuery(ddlFile, true, false, true);
+
+ // long start = System.currentTimeMillis();
+ // q.execute(port);
+ // long end = System.currentTimeMillis();
+ // System.err.println(start + " " + end + " " + (end - start));
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java
new file mode 100644
index 0000000..91af8de
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestKeywordIndexJob.java
@@ -0,0 +1,235 @@
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class TestKeywordIndexJob {
+
+ private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
+ static {
+ typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ }
+
+ public static int DEFAULT_INPUT_DATA_COLUMN = 0;
+ public static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
+
+ @SuppressWarnings("unchecked")
+ public JobSpecification createJobSpec() throws AsterixException, HyracksDataException {
+
+ JobSpecification spec = new JobSpecification();
+
+ // ---------- START GENERAL BTREE STUFF
+
+ IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ // ---------- END GENERAL BTREE STUFF
+
+ List<String> nodeGroup = new ArrayList<String>();
+ nodeGroup.add("nc1");
+ nodeGroup.add("nc2");
+
+ // ---------- START KEY PROVIDER OP
+
+ // TODO: should actually be empty tuple source
+ // build tuple containing low and high search keys
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
+ DataOutput dos = tb.getDataOutput();
+
+ tb.reset();
+ AObjectSerializerDeserializer.INSTANCE.serialize(new AString("Jodi Rotruck"), dos); // dummy
+ // field
+ tb.addFieldEndOffset();
+
+ ISerializerDeserializer[] keyRecDescSers = { AObjectSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ String[] keyProviderOpLocationConstraint = new String[nodeGroup.size()];
+ for (int p = 0; p < nodeGroup.size(); p++) {
+ keyProviderOpLocationConstraint[p] = nodeGroup.get(p);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, keyProviderOpLocationConstraint);
+
+ // ---------- END KEY PROVIDER OP
+
+ // ---------- START SECONRARY INDEX SCAN
+
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[2];
+ secondaryTypeTraits[0] = new ITypeTraits() {
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+ };
+
+ secondaryTypeTraits[1] = new ITypeTraits() {
+
+ @Override
+ public boolean isFixedLength() {
+ return true;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return 5;
+ }
+ };
+
+ ITreeIndexFrameFactory interiorFrameFactory = AqlMetadataProvider
+ .createBTreeNSMInteriorFrameFactory(secondaryTypeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = AqlMetadataProvider.createBTreeNSMLeafFrameFactory(secondaryTypeTraits);
+
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[2];
+ secondaryRecFields[0] = AObjectSerializerDeserializer.INSTANCE;
+ secondaryRecFields[1] = AObjectSerializerDeserializer.INSTANCE;
+ IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
+ secondaryComparatorFactories[0] = AObjectAscBinaryComparatorFactory.INSTANCE;
+ secondaryComparatorFactories[1] = AObjectAscBinaryComparatorFactory.INSTANCE;
+
+ int[] lowKeyFields = null;
+ int[] highKeyFields = null;
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+ // TODO: change file splits according to mount points in cluster config
+ IFileSplitProvider secondarySplitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit("nc1", new FileReference(new File("/tmp/nc1/demo1112/Customers_idx_NameInvIndex"))),
+ new FileSplit("nc2", new FileReference(new File("/tmp/nc2/demo1112/Customers_idx_NameInvIndex"))) });
+ BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ storageManager, btreeRegistryProvider, secondarySplitProvider, interiorFrameFactory, leafFrameFactory,
+ secondaryTypeTraits, secondaryComparatorFactories, true, lowKeyFields, highKeyFields, true, true,
+ new BTreeDataflowHelperFactory());
+ String[] secondarySearchOpLocationConstraint = new String[nodeGroup.size()];
+ for (int p = 0; p < nodeGroup.size(); p++) {
+ secondarySearchOpLocationConstraint[p] = nodeGroup.get(p);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp,
+ secondarySearchOpLocationConstraint);
+
+ // ---------- END SECONDARY INDEX SCAN
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ String[] printerLocationConstraint = new String[nodeGroup.size()];
+ for (int p = 0; p < nodeGroup.size(); p++) {
+ printerLocationConstraint[p] = nodeGroup.get(p);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, printerLocationConstraint);
+
+ // ---------- START CONNECT THE OPERATORS
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, printer, 0);
+
+ // ---------- END CONNECT THE OPERATORS
+
+ spec.addRoot(printer);
+
+ return spec;
+ }
+
+ public static void main(String[] args) throws Exception {
+ String host;
+ String appName;
+ String ddlFile;
+
+ switch (args.length) {
+ case 0: {
+ host = "127.0.0.1";
+ appName = "asterix";
+ ddlFile = "/home/abehm/workspace/asterix/src/test/resources/demo0927/local/create-index.aql";
+ System.out.println("No arguments specified, using defauls:");
+ System.out.println("HYRACKS HOST: " + host);
+ System.out.println("APPNAME: " + appName);
+ System.out.println("DDLFILE: " + ddlFile);
+ }
+ break;
+
+ case 3: {
+ host = args[0];
+ appName = args[1];
+ ddlFile = args[2];
+ }
+ break;
+
+ default: {
+ System.out.println("USAGE:");
+ System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
+ System.out.println("ARG 2: Application Name (e.g., asterix)");
+ System.out.println("ARG 3: DDL File");
+ host = null;
+ appName = null;
+ ddlFile = null;
+ System.exit(0);
+ }
+ break;
+ }
+
+ int port = 1098;
+ IHyracksClientConnection hcc = new HyracksConnection(host, port);
+
+ TestKeywordIndexJob tij = new TestKeywordIndexJob();
+ JobSpecification jobSpec = tij.createJobSpec();
+ JobId jobId = hcc.createJob("asterix", jobSpec);
+
+ long start = System.currentTimeMillis();
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java
new file mode 100644
index 0000000..025fa16
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/TestSecondaryIndexJob.java
@@ -0,0 +1,237 @@
+package edu.uci.ics.asterix.file;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.context.AsterixStorageManagerInterface;
+import edu.uci.ics.asterix.context.AsterixTreeRegistryProvider;
+import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AObjectSerializerDeserializer;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class TestSecondaryIndexJob {
+
+ private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
+ static {
+ typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ }
+
+ public static int DEFAULT_INPUT_DATA_COLUMN = 0;
+ public static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
+
+ @SuppressWarnings("unchecked")
+ public JobSpecification createJobSpec() throws AsterixException, HyracksDataException {
+
+ JobSpecification spec = new JobSpecification();
+
+ // ---------- START GENERAL BTREE STUFF
+
+ IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
+ IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
+
+ // ---------- END GENERAL BTREE STUFF
+
+ List<String> nodeGroup = new ArrayList<String>();
+ nodeGroup.add("nc1");
+ nodeGroup.add("nc2");
+
+ // ---------- START KEY PROVIDER OP
+
+ // TODO: should actually be empty tuple source
+ // build tuple containing low and high search keys
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1); // just one dummy field
+ DataOutput dos = tb.getDataOutput();
+
+ tb.reset();
+ AObjectSerializerDeserializer.INSTANCE.serialize(new AString("Jodi Rotruck"), dos); // dummy
+ // field
+ tb.addFieldEndOffset();
+
+ ISerializerDeserializer[] keyRecDescSers = { AObjectSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ String[] keyProviderOpLocationConstraint = new String[nodeGroup.size()];
+ for (int p = 0; p < nodeGroup.size(); p++) {
+ keyProviderOpLocationConstraint[p] = nodeGroup.get(p);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, keyProviderOpLocationConstraint);
+
+ // ---------- END KEY PROVIDER OP
+
+ // ---------- START SECONRARY INDEX SCAN
+
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[2];
+ secondaryTypeTraits[0] = new ITypeTraits() {
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+ };
+
+ secondaryTypeTraits[1] = new ITypeTraits() {
+
+ @Override
+ public boolean isFixedLength() {
+ return true;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return 5;
+ }
+ };
+
+ ITreeIndexFrameFactory interiorFrameFactory = AqlMetadataProvider
+ .createBTreeNSMInteriorFrameFactory(secondaryTypeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = AqlMetadataProvider.createBTreeNSMLeafFrameFactory(secondaryTypeTraits);
+
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[2];
+ secondaryRecFields[0] = AObjectSerializerDeserializer.INSTANCE;
+ secondaryRecFields[1] = AObjectSerializerDeserializer.INSTANCE;
+ IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
+ secondaryComparatorFactories[0] = AObjectAscBinaryComparatorFactory.INSTANCE;
+ secondaryComparatorFactories[1] = AObjectAscBinaryComparatorFactory.INSTANCE;
+
+ int[] lowKeyFields = null; // -infinity
+ int[] highKeyFields = null; // +infinity
+ RecordDescriptor secondaryRecDesc = new RecordDescriptor(secondaryRecFields);
+ // TODO: change file splits according to mount points in cluster config
+ IFileSplitProvider secondarySplitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit("nc1", new FileReference(new File("/tmp/nc1/demo1112/Customers_idx_NameBtreeIndex"))),
+ new FileSplit("nc2", new FileReference(new File("/tmp/nc2/demo1112/Customers_idx_NameBtreeIndex"))) });
+ BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ storageManager, btreeRegistryProvider, secondarySplitProvider, interiorFrameFactory, leafFrameFactory,
+ secondaryTypeTraits, secondaryComparatorFactories, true, lowKeyFields, highKeyFields, true, true,
+ new BTreeDataflowHelperFactory());
+ String[] secondarySearchOpLocationConstraint = new String[nodeGroup.size()];
+ for (int p = 0; p < nodeGroup.size(); p++) {
+ secondarySearchOpLocationConstraint[p] = nodeGroup.get(p);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondarySearchOp,
+ secondarySearchOpLocationConstraint);
+
+ // ---------- END SECONDARY INDEX SCAN
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ String[] printerLocationConstraint = new String[nodeGroup.size()];
+ for (int p = 0; p < nodeGroup.size(); p++) {
+ printerLocationConstraint[p] = nodeGroup.get(p);
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, printerLocationConstraint);
+
+ // ---------- START CONNECT THE OPERATORS
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondarySearchOp, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), secondarySearchOp, 0, printer, 0);
+
+ // ---------- END CONNECT THE OPERATORS
+
+ spec.addRoot(printer);
+
+ return spec;
+ }
+
+ public static void main(String[] args) throws Exception {
+ String host;
+ String appName;
+ String ddlFile;
+
+ switch (args.length) {
+ case 0: {
+ host = "127.0.0.1";
+ appName = "asterix";
+ ddlFile = "/home/nicnic/workspace/asterix/trunk/asterix/asterix-app/src/test/resources/demo0927/local/create-index.aql";
+ System.out.println("No arguments specified, using defauls:");
+ System.out.println("HYRACKS HOST: " + host);
+ System.out.println("APPNAME: " + appName);
+ System.out.println("DDLFILE: " + ddlFile);
+ }
+ break;
+
+ case 3: {
+ host = args[0];
+ appName = args[1];
+ ddlFile = args[2];
+ }
+ break;
+
+ default: {
+ System.out.println("USAGE:");
+ System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
+ System.out.println("ARG 2: Application Name (e.g., asterix)");
+ System.out.println("ARG 3: DDL File");
+ host = null;
+ appName = null;
+ ddlFile = null;
+ System.exit(0);
+ }
+ break;
+ }
+
+ int port = 1098;
+ IHyracksClientConnection hcc = new HyracksConnection(host, port);
+
+ TestSecondaryIndexJob tij = new TestSecondaryIndexJob();
+ JobSpecification jobSpec = tij.createJobSpec();
+ JobId jobId = hcc.createJob("asterix", jobSpec);
+
+ long start = System.currentTimeMillis();
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixNodeState.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixNodeState.java
new file mode 100644
index 0000000..dbf1625
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixNodeState.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.io.Serializable;
+
+public class AsterixNodeState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private int apiNodeDataServerPort;
+
+ public int getAPINodeDataServerPort() {
+ return apiNodeDataServerPort;
+ }
+
+ public void setAPINodeDataServerPort(int port) {
+ this.apiNodeDataServerPort = port;
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCBootstrapImpl.java
new file mode 100644
index 0000000..bcbfb8c
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCBootstrapImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import edu.uci.ics.asterix.api.aqlj.server.APIClientThreadFactory;
+import edu.uci.ics.asterix.api.aqlj.server.ThreadedServer;
+import edu.uci.ics.asterix.api.http.servlet.APIServlet;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
+import edu.uci.ics.asterix.metadata.bootstrap.AsterixProperties;
+import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+
+public class CCBootstrapImpl implements ICCBootstrap {
+ private static final Logger LOGGER = Logger.getLogger(CCBootstrapImpl.class.getName());
+
+ private static final int DEFAULT_WEB_SERVER_PORT = 19001;
+ public static final int DEFAULT_API_SERVER_PORT = 14600;
+ private static final int DEFAULT_API_NODEDATA_SERVER_PORT = 14601;
+
+ private Server server;
+ private static IAsterixStateProxy proxy;
+ private ICCApplicationContext appCtx;
+ private ThreadedServer apiServer;
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.info("Starting Asterix CC Bootstrap");
+ String portStr = System.getProperty(GlobalConfig.WEB_SERVER_PORT_PROPERTY);
+ int port = DEFAULT_WEB_SERVER_PORT;
+ if (portStr != null) {
+ port = Integer.parseInt(portStr);
+ }
+ server = new Server(port);
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+ server.setHandler(context);
+
+ context.addServlet(new ServletHolder(new APIServlet()), "/*");
+ server.start();
+ proxy = AsterixStateProxy.registerRemoteObject();
+ proxy.setAsterixProperties(AsterixProperties.INSTANCE);
+
+ // set the APINodeDataServer ports
+ int startPort = DEFAULT_API_NODEDATA_SERVER_PORT;
+ Map<String, Set<String>> nodeNameMap = new HashMap<String, Set<String>>();
+ try {
+ appCtx.getCCContext().getIPAddressNodeMap(nodeNameMap);
+ } catch (Exception e) {
+ throw new IOException(" unable to obtain IP address node map", e);
+ }
+ for (Map.Entry<String, Set<String>> entry : nodeNameMap.entrySet()) {
+ Set<String> nodeNames = entry.getValue();
+ Iterator<String> it = nodeNames.iterator();
+ while (it.hasNext()) {
+ AsterixNodeState ns = new AsterixNodeState();
+ ns.setAPINodeDataServerPort(startPort);
+ proxy.setAsterixNodeState(it.next(), ns);
+ startPort++;
+ }
+ }
+
+ appCtx.setDistributedState(proxy);
+ MetadataManager.INSTANCE = new MetadataManager(proxy);
+ apiServer = new ThreadedServer(DEFAULT_API_SERVER_PORT, new APIClientThreadFactory(appCtx));
+ apiServer.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.info("Stopping Asterix CC Bootstrap");
+ AsterixStateProxy.deRegisterRemoteObject();
+ server.stop();
+ apiServer.shutdown();
+ }
+
+ @Override
+ public void setApplicationContext(ICCApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
new file mode 100644
index 0000000..63fb497
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.rmi.server.UnicastRemoteObject;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.aqlj.server.NodeDataClientThreadFactory;
+import edu.uci.ics.asterix.api.aqlj.server.ThreadedServer;
+import edu.uci.ics.asterix.api.common.AsterixAppContextInfoImpl;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.context.AsterixAppRuntimeContext;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataNode;
+import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
+import edu.uci.ics.asterix.metadata.api.IMetadataNode;
+import edu.uci.ics.asterix.metadata.bootstrap.AsterixProperties;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataBootstrap;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+
+public class NCBootstrapImpl implements INCBootstrap {
+ private static final Logger LOGGER = Logger.getLogger(NCBootstrapImpl.class.getName());
+
+ public static final int DEFAULT_AQLJ_NODE_DATA_SERVER_PORT = 6061;
+
+ private INCApplicationContext ncAppContext = null;
+
+ private static IMetadataNode metadataNode;
+ private String nodeId;
+
+ private ThreadedServer apiNodeDataServer;
+
+ @Override
+ public void start() throws Exception {
+
+ LOGGER.info("Starting Asterix NC " + nodeId + " Bootstrap");
+ IAsterixStateProxy p = (IAsterixStateProxy) ncAppContext.getDistributedState();
+ LOGGER.info("\nMetadata node " + p.getAsterixProperties().getMetadataNodeName());
+ initializeTransactionSupport(ncAppContext, nodeId);
+ if (nodeId.equals(p.getAsterixProperties().getMetadataNodeName())) {
+ AsterixAppRuntimeContext.initialize(ncAppContext);
+ LOGGER.info("Initialized AsterixRuntimeContext: " + AsterixAppRuntimeContext.getInstance());
+ metadataNode = registerRemoteObject(ncAppContext, p.getAsterixProperties());
+ p.setMetadataNode(metadataNode);
+ MetadataManager.INSTANCE = new MetadataManager(p);
+ LOGGER.info("Bootstrapping Metadata");
+ MetadataManager.INSTANCE.init();
+ MetadataBootstrap.startUniverse(p.getAsterixProperties(), AsterixAppContextInfoImpl.INSTANCE);
+ } else {
+ Thread.sleep(5000);
+ AsterixAppRuntimeContext.initialize(ncAppContext);
+ LOGGER.info("Initialized AsterixRuntimeContext: " + AsterixAppRuntimeContext.getInstance());
+ }
+
+ IAsterixStateProxy proxy = (IAsterixStateProxy) ncAppContext.getDistributedState();
+ AsterixNodeState ns = (AsterixNodeState) proxy.getAsterixNodeState(ncAppContext.getNodeId());
+ apiNodeDataServer = new ThreadedServer(ns.getAPINodeDataServerPort(), new NodeDataClientThreadFactory());
+ apiNodeDataServer.start();
+ }
+
+ public static IMetadataNode registerRemoteObject(INCApplicationContext ncAppContext,
+ AsterixProperties asterixProperties) throws AsterixException {
+ try {
+ TransactionProvider factory = (TransactionProvider) ncAppContext.getApplicationObject();
+ MetadataNode.INSTANCE = new MetadataNode(asterixProperties, AsterixAppContextInfoImpl.INSTANCE, factory);
+ IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, 0);
+ LOGGER.info("MetadataNode bound.");
+ return stub;
+ } catch (Exception e) {
+ LOGGER.info("MetadataNode exception.");
+ throw new AsterixException(e);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.info("Stopping Asterix NC Bootstrap");
+ IAsterixStateProxy p = (IAsterixStateProxy) ncAppContext.getDistributedState();
+ if (nodeId.equals(p.getAsterixProperties().getMetadataNodeName())) {
+ MetadataBootstrap.stopUniverse();
+ }
+ AsterixAppRuntimeContext.deinitialize();
+ apiNodeDataServer.shutdown();
+ }
+
+ @Override
+ public void setApplicationContext(INCApplicationContext appCtx) {
+ this.ncAppContext = appCtx;
+ this.nodeId = ncAppContext.getNodeId();
+ }
+
+ private void initializeTransactionSupport(INCApplicationContext ncAppContext, String nodeId) {
+ try {
+ TransactionProvider factory = new TransactionProvider(nodeId);
+ ncAppContext.setApplicationObject(factory);
+ } catch (ACIDException e) {
+ e.printStackTrace();
+ LOGGER.severe(" Could not initialize transaction support ");
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java b/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
new file mode 100644
index 0000000..85a6912
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ConstantTupleSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private int[] fieldSlots;
+ private byte[] tupleData;
+ private int tupleSize;
+
+ public ConstantTupleSourceOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc, int[] fieldSlots,
+ byte[] tupleData, int tupleSize) {
+ super(spec, 0, 1);
+ this.tupleData = tupleData;
+ this.fieldSlots = fieldSlots;
+ this.tupleSize = tupleSize;
+ recordDescriptors[0] = recDesc;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new ConstantTupleSourceOperatorNodePushable(ctx, fieldSlots, tupleData, tupleSize);
+ }
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
new file mode 100644
index 0000000..34a8e5e
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class ConstantTupleSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private IHyracksTaskContext ctx;
+
+ private int[] fieldSlots;
+ private byte[] tupleData;
+ private int tupleSize;
+
+ public ConstantTupleSourceOperatorNodePushable(IHyracksTaskContext ctx, int[] fieldSlots, byte[] tupleData,
+ int tupleSize) {
+ super();
+ this.fieldSlots = fieldSlots;
+ this.tupleData = tupleData;
+ this.tupleSize = tupleSize;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ ByteBuffer writeBuffer = ctx.allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ if (fieldSlots != null && tupleData != null && tupleSize > 0)
+ appender.append(fieldSlots, tupleData, 0, tupleSize);
+ writer.open();
+ FrameUtils.flushFrame(writeBuffer, writer);
+ writer.close();
+ }
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/resources/asterix-idefix.properties b/asterix-app/src/main/resources/asterix-idefix.properties
new file mode 100755
index 0000000..278bcb4
--- /dev/null
+++ b/asterix-app/src/main/resources/asterix-idefix.properties
@@ -0,0 +1,2 @@
+nc1.stores=/home/nicnic/Work/Asterix/tests/tpch/nc1data
+nc2.stores=/home/nicnic/Work/Asterix/tests/tpch/nc2data
diff --git a/asterix-app/src/main/resources/asterix-metadata.properties b/asterix-app/src/main/resources/asterix-metadata.properties
new file mode 100644
index 0000000..e9ccc63
--- /dev/null
+++ b/asterix-app/src/main/resources/asterix-metadata.properties
@@ -0,0 +1,4 @@
+MetadataNode=nc1
+NewUniverse=true
+nc1.stores=/tmp/nc1data/
+nc2.stores=/tmp/nc2data/, /tmp/nc2data1/
\ No newline at end of file
diff --git a/asterix-app/src/main/resources/asterix-peach.properties b/asterix-app/src/main/resources/asterix-peach.properties
new file mode 100644
index 0000000..20a6eeb
--- /dev/null
+++ b/asterix-app/src/main/resources/asterix-peach.properties
@@ -0,0 +1,2 @@
+nc1.stores=/tmp/nc1/
+nc2.stores=/tmp/nc2/
diff --git a/asterix-app/src/main/resources/asterix-rainbow.properties b/asterix-app/src/main/resources/asterix-rainbow.properties
new file mode 100644
index 0000000..d5febe4
--- /dev/null
+++ b/asterix-app/src/main/resources/asterix-rainbow.properties
@@ -0,0 +1,5 @@
+rainbow-01.stores=/data/onose/rainbow-01/
+rainbow-02.stores=/data/onose/rainbow-02/
+rainbow-03.stores=/data/onose/rainbow-03/
+rainbow-04.stores=/data/onose/rainbow-04/
+rainbow-05.stores=/data/onose/rainbow-05/
\ No newline at end of file
diff --git a/asterix-app/src/main/resources/asterix.properties b/asterix-app/src/main/resources/asterix.properties
new file mode 100755
index 0000000..78cd2b9
--- /dev/null
+++ b/asterix-app/src/main/resources/asterix.properties
@@ -0,0 +1,10 @@
+asterix-001.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-002.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-003.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-004.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-005.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-006.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-007.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-008.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-009.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
+asterix-010.stores=/mnt/data/sda/space/onose,/mnt/data/sdb/space/onose,/mnt/data/sdc/space/onose,/mnt/data/sdd/space/onose
diff --git a/asterix-app/src/main/resources/hyracks-deployment.properties b/asterix-app/src/main/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..a8a943e
--- /dev/null
+++ b/asterix-app/src/main/resources/hyracks-deployment.properties
@@ -0,0 +1,2 @@
+cc.bootstrap.class=edu.uci.ics.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=edu.uci.ics.asterix.hyracks.bootstrap.NCBootstrapImpl
\ No newline at end of file
diff --git a/asterix-app/src/main/resources/hyracks-initdb-deployment.properties b/asterix-app/src/main/resources/hyracks-initdb-deployment.properties
new file mode 100644
index 0000000..e40db59
--- /dev/null
+++ b/asterix-app/src/main/resources/hyracks-initdb-deployment.properties
@@ -0,0 +1,2 @@
+cc.bootstrap.class=edu.uci.ics.initdb.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=edu.uci.ics.initdb.hyracks.bootstrap.NCBootstrapImpl
\ No newline at end of file
diff --git a/asterix-app/src/main/resources/idefix-4nc.properties b/asterix-app/src/main/resources/idefix-4nc.properties
new file mode 100755
index 0000000..747eb41
--- /dev/null
+++ b/asterix-app/src/main/resources/idefix-4nc.properties
@@ -0,0 +1,4 @@
+nc1.stores=/home/nicnic/Work/Asterix/tests/tpch/nc1data
+nc2.stores=/home/nicnic/Work/Asterix/tests/tpch/nc2data
+nc3.stores=/home/nicnic/Work/Asterix/tests/tpch/nc3data
+nc4.stores=/home/nicnic/Work/Asterix/tests/tpch/nc4data
diff --git a/asterix-app/src/main/resources/test.properties b/asterix-app/src/main/resources/test.properties
new file mode 100755
index 0000000..01a593b
--- /dev/null
+++ b/asterix-app/src/main/resources/test.properties
@@ -0,0 +1,5 @@
+MetadataNode=nc1
+NewUniverse=true
+nc1.stores=/tmp/nc1data/
+nc2.stores=/tmp/nc2data/
+OutputDir=/tmp/asterix_output/
diff --git a/asterix-app/src/main/resources/testnc1.properties b/asterix-app/src/main/resources/testnc1.properties
new file mode 100755
index 0000000..c0ad3de
--- /dev/null
+++ b/asterix-app/src/main/resources/testnc1.properties
@@ -0,0 +1 @@
+nc1.stores=nc1data
diff --git a/asterix-app/src/main/scripts/run.cmd b/asterix-app/src/main/scripts/run.cmd
new file mode 100644
index 0000000..b8eb4a0
--- /dev/null
+++ b/asterix-app/src/main/scripts/run.cmd
@@ -0,0 +1,63 @@
+@ECHO OFF
+SETLOCAL
+
+:: Licensed to the Apache Software Foundation (ASF) under one or more
+:: contributor license agreements. See the NOTICE file distributed with
+:: this work for additional information regarding copyright ownership.
+:: The ASF licenses this file to You under the Apache License, Version 2.0
+:: (the "License"); you may not use this file except in compliance with
+:: the License. You may obtain a copy of the License at
+::
+:: http://www.apache.org/licenses/LICENSE-2.0
+::
+:: Unless required by applicable law or agreed to in writing, software
+:: distributed under the License is distributed on an "AS IS" BASIS,
+:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+:: See the License for the specific language governing permissions and
+:: limitations under the License.
+
+:: JAVA classpath
+:: Use the local variable CLASSPATH to add custom entries (e.g. JDBC drivers) to
+:: the classpath. Separate multiple paths with ":". Enclose the value
+:: in double quotes. Adding additional files or locations on separate
+:: lines makes things clearer.
+:: Note: If under running under cygwin use "/cygdrive/c/..." for "C:/..."
+:: Example:
+::
+:: Set the CLASSPATH to a jar file and a directory. Note that
+:: "classes dir" is a directory of class files with a space in the name.
+::
+:: CLASSPATH="usr/local/Product1/lib/product.jar"
+:: CLASSPATH="${CLASSPATH}:../MyProject/classes dir"
+::
+SET CLASSPATH="@classpath@"
+
+:: JVM parameters
+:: If you want to modify the default parameters (e.g. maximum heap size -Xmx)
+:: for the Java virtual machine set the local variable JVM_PARAMETERS below
+:: Example:
+:: JVM_PARAMETERS=-Xms100M -Xmx200M
+::
+:: Below are the JVM parameters needed to do remote debugging using Intellij
+:: IDEA. Uncomment and then do: JVM_PARAMETERS="$IDEA_REMOTE_DEBUG_PARAMS"
+:: IDEA_REMOTE_DEBUG_PARAMS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+::
+:: JVM_PARAMETERS=
+
+:: ---------------------------------------------------------------------------
+:: Default configuration. Do not modify below this line.
+:: ---------------------------------------------------------------------------
+:: Application specific parameters
+
+SET MAIN_CLASS=@main.class@
+SET JVM_PARAMS=@jvm.params@
+SET PROGRAM_PARAMS=@program.params@
+
+:: Try to find java virtual machine
+IF NOT DEFINED JAVA (
+ IF NOT DEFINED JAVA_HOME SET JAVA="java.exe"
+ IF DEFINED JAVA_HOME SET JAVA="%JAVA_HOME%\bin\java.exe"
+)
+
+:: Run program
+%JAVA% %JVM_PARAMS% %JVM_PARAMETERS% -classpath %CLASSPATH% %MAIN_CLASS% %PROGRAM_PARAMS% %*
diff --git a/asterix-app/src/main/scripts/run.sh b/asterix-app/src/main/scripts/run.sh
new file mode 100644
index 0000000..a998626
--- /dev/null
+++ b/asterix-app/src/main/scripts/run.sh
@@ -0,0 +1,81 @@
+#!/bin/sh
+# JAVA classpath
+# Use the local variable CLASSPATH to add custom entries (e.g. JDBC drivers) to
+# the classpath. Separate multiple paths with ":". Enclose the value
+# in double quotes. Adding additional files or locations on separate
+# lines makes things clearer.
+# Note: If under running under cygwin use "/cygdrive/c/..." for "C:/..."
+# Example:
+#
+# Set the CLASSPATH to a jar file and a directory. Note that
+# "classes dir" is a directory of class files with a space in the name.
+#
+# CLASSPATH="usr/local/Product1/lib/product.jar"
+# CLASSPATH="${CLASSPATH}:../MyProject/classes dir"
+#
+CLASSPATH="@classpath@"
+
+# JVM parameters
+# If you want to modify the default parameters (e.g. maximum heap size -Xmx)
+# for the Java virtual machine set the local variable JVM_PARAMETERS below
+# Example:
+# JVM_PARAMETERS=-Xms100M -Xmx200M
+#
+# Below are the JVM parameters needed to do remote debugging using Intellij
+# IDEA. Uncomment and then do: JVM_PARAMETERS="$IDEA_REMOTE_DEBUG_PARAMS"
+# IDEA_REMOTE_DEBUG_PARAMS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
+#
+# JVM_PARAMETERS=
+
+#run with shared memory setup
+#if [ -n "${RUN_SHARED_MEM}"]; then
+# JVM_PARAMETERS="${JVM_PARAMETERS} -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=n,address=javadebug,suspend=y"
+#fi
+
+# ---------------------------------------------------------------------------
+# Default configuration. Do not modify below this line.
+# ---------------------------------------------------------------------------
+# Application specific parameters
+
+MAIN_CLASS="@main.class@"
+JVM_PARAMS="@jvm.params@"
+PROGRAM_PARAMS="@program.params@"
+
+# Cygwin support. $cygwin _must_ be set to either true or false.
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ *) cygwin=false ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin; then
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# Try to find java virtual machine
+if [ -z "${JAVA}" ]; then
+ if [ -z "${JAVA_HOME}" ]; then
+ JAVA=java
+ else
+ JAVA=${JAVA_HOME}/bin/java
+ fi
+fi
+
+# Try to find directory where this script is located
+COMMAND="${PWD}/$0"
+if [ ! -f "${COMMAND}" ]; then
+ COMMAND="$0"
+fi
+BASEDIR=`expr "${COMMAND}" : '\(.*\)/\.*'`
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+# JAVA=`cygpath --path --windows "$JAVA"`
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+fi
+
+# Run program
+${JAVA} ${JVM_PARAMS} ${JVM_PARAMETERS} -classpath "${CLASSPATH}" ${MAIN_CLASS} ${PROGRAM_PARAMS} $*