Apache Ignite

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Distributed Queries

Overview

Ignite supports free-form SQL queries without any limitations. The SQL syntax is ANSI-99 compliant which means that you can use any kind of SQL functions, aggregations, groupings or joins, defined by the specification, as a part of an SQL query.

Furthermore, the queries are fully distributed. The SQL engine is capable of not only mapping a query to specific nodes and reducing their responses into a final result set, it is also able to join the data sets stored in different caches on various nodes. Additionally, the SQL engine performs in a fault-tolerant fashion guaranteeing that you will never get an incomplete or wrong result in case a new node joins the cluster or an old one leaves it.

How SQL Queries Work

Apache Ignite SQL Grid component is tightly coupled with H2 Database which, in short, is a fast in-memory and disk-based database written in Java and available under a number of open source licenses.

An embedded H2 instance is always started as a part of an Apache Ignite node process whenever ignite-indexing module is added to the node's classpath. If the node is started from a terminal using ignite.sh{bat} script then copy {apache_ignite}\libs\optional\ignite-indexing directory to {apache_ignite}\libs\. If you use Maven then add the dependency below to a pom.xml file:

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-indexing</artifactId>
    <version>${ignite.version}</version>
</dependency>

Apache Ignite leverages from H2's SQL query parser and optimizer as well as the execution planner. Lastly, H2 executes a query locally on a particular node (a distributed query is mapped to the node or the query is executed in LOCAL mode) and passes a local result to a distributed Ignite SQL engine for further processing.

However, the data, as well as the indexes, are always stored in the Ignite Data Grid. Additionally, Ignite executes queries in a distributed and fault-tolerant manner which is not supported by H2.

Ignite SQL Grid executes queries in two ways:

First, if a query is executed against a REPLICATED cache on a node where the cache is deployed, then Ignite assumes that all the data is available locally and will run a simple local SQL query passing it directly to the H2 database engine. The same execution flow is true for LOCAL caches.

Local Queries

Learn more about local SQL queries in Ignite from this page.

Second, if a query is executed over a PARTITIONED cache, then the execution flow will be the following:

  • The query will be parsed and split into multiple map queries and a single reduce query.
  • All the map queries are executed on all the data nodes where cache data resides.
  • All the nodes provide result sets of local execution to the query initiator (reducer) that, in turn, will accomplish the reduce phase by properly merging provided result sets.

Execution Flow of Cross-Cache Queries

The execution flow of cross-cache queries or queries with joins is not different from the one described for the PARTITIONED cache above and will be covered later as part of this documentation.

Handling of Result Sets With ORDER BY and GROUP BY

SQL queries with ORDER BY clause do not require loading the whole result set to a query initiator (reducer) node in order to complete the sorting. Instead, every node where a query will be mapped to will sort its own part of the overall result set and the reducer will do the merge in a streaming fashion.

The same optimization is implemented for sorted GROUP BY queries - there is no need to load the whole result set to the reducer in order to do the grouping before giving it to an application. In Apache Ignite, partial result sets from the individual nodes can be streamed, merged, aggregated, and returned to the application gradually.

Query Types

There are two general types of SQL queries that are available at Java API level - SqlQuery and SqlFieldsQuery.

Alternative APIs

Apache Ignite In-Memory SQL Grid is not bound to Java APIs only. You can connect to an Ignite cluster from .NET and C++ using Ignite ODBC/JDBC driver and execute SQL queries. Learn more about additional APIs from the pages listed below:

SqlQuery

SqlQuery is useful for scenarios when at the end of a query execution you need to get the whole object, stored in a cache (key and value), back in a final result set. The code snippet below shows how this can be done.

IgniteCache<Long, Person> cache = ignite.cache("personCache");

SqlQuery sql = new SqlQuery(Person.class, "salary > ?");

// Find all persons earning more than 1,000.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs(1000))) {
  for (Entry<Long, Person> e : cursor)
    System.out.println(e.getValue().toString());
}

SqlFieldsQueries

Instead of selecting the whole object, you can choose to select only specific fields in order to minimize network and serialization overhead. For this purpose, Ignite implements a concept of fields queries. SqlFieldsQuery accepts a conventional ANSI-99 SQL query as its constructor​ parameter and executes it, as shown in the example below.

IgniteCache<Long, Person> cache = ignite.cache("personCache");

// Execute query to get names of all employees.
SqlFieldsQuery sql = new SqlFieldsQuery(
  "select concat(firstName, ' ', lastName) from Person");

// Iterate over the result set.
try (QueryCursor<List<?>> cursor = cache.query(sql) {
  for (List<?> row : cursor)
    System.out.println("personName=" + row.get(0));
}

Queryable Fields Definition

Before specific fields can be accessed inside of SqlQuery or SqlFieldsQuery, they have to be annotated at a POJO level or defined in a QueryEntity so that the SQL engines becomes aware of them. Refer to indexes documentation that covers this topic.

Accessing Entry's Key and Value

Use _key and _value keywords in a SQL query in order to compare to an entry's complete key or value rather than to individual fields. Apply the same keywords if you need to return a key or a value as a result of an SQL query execution.
On the other hand, if a key or value is of a primitive type (int, String, Date, etc.), then it will be automatically added to the result set of a query like - SELECT * FROM ....

Cross-Cache Queries

Data can be queried from multiple caches as part of a single SqlQuery or SqlFieldsQuery query. In this case, cache names act as schema names in conventional RDBMS like SQL queries. The name of the cache that is used to create an IgniteCache instance, that is used to execute the query, will be used as a default schema name and does not need to be explicitly specified. The rest of the objects, that are stored in different caches and will be queried, have to be prefixed with the names of their caches (additional schemas names).

// In this example, suppose Person objects are stored in a 
// cache named 'personCache' and Organization objects 
// are stored in a cache named 'orgCache'.
IgniteCache<Long, Person> personCache = ignite.cache("personCache");

// Select with join between Person and Organization to 
// get the names of all the employees of a specific organization.
SqlFieldsQuery sql = new SqlFieldsQuery(
    "select Person.name  "
        + "from Person as p, \"orgCache\".Organization as org where "
        + "p.orgId = org.id "
        + "and org.name = ?");

// Execute the query and obtain the query result cursor.
try (QueryCursor<List<?>> cursor =  personCache.query(sql.setArgs("Ignite"))) {
    for (List<?> row : cursor)
        System.out.println("Person name=" + row.get(0));
}

In the example above, an instance of SqlFieldsQuery is created from personCache whose name is treated as a default schema name right after that. This is why Person object is accessed without explicitly specified schema name (from Person as p). As for Organization object, since it's stored in a separate cache named orgCache, the name of this cache must be set as a schema name explicitly in the query ("orgCache".Organization as org).

Changing Schema Name

If you prefer to use a schema name that is different from a cache name, then you can take advantage of CacheConfiguration.setSqlSchema(...) method.

Distributed Joins

Ignite supports collocated and non-collocated distributed SQL joins. Moreover, if the data resides in different caches, Ignite allows for cross-cache joins as well.

IgniteCache<Long, Person> cache = ignite.cache("personCache");

// SQL join on Person and Organization.
SqlQuery sql = new SqlQuery(Person.class,
  "from Person as p, \"orgCache\".Organization as org"
  + "where p.orgId = org.id "
  + "and lower(org.name) = lower(?)");

// Find all persons working for Ignite organization.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs("Ignite"))) {
  for (Entry<Long, Person> e : cursor)
    System.out.println(e.getValue().toString());
}

Joins between PARTITIONED and REPLICATED caches always work without any limitations.

However, if you do a join between at least two PARTITIONED data sets, then you must make sure that the keys you are joining on are either collocated or you have to enable the non-collocated joins parameter for the query. The two types of distributed joins modes are explained further below.

Data Collocation

To learn more about data collocation concept and how to use it in practice refer to the dedicated documentation section

Distributed Collocated Joins

By default, if an SQL join has to be done across a number of Ignite caches, then all the caches have to be collocated. Otherwise, you will get an incomplete result at the end of query execution because at the join phase a node uses the data that is available only locally. Referring to Picture 1. below you will see that, first, an SQL query is sent to all the nodes (Q) where data, required for a join, is located. After that the query is executed right away by every node (E(Q)) over the local data set and, finally, the overall execution result is aggregated on the client side (R).

Picture 1. Collocated SQL Query

Picture 1. Collocated SQL Query

Distributed Non-Collocated Joins

Besides the fact that the affinity collocation is a powerful concept that, once set up for an application's business entities (caches), will let you execute cross-cache joins in the most optimal way by returning a complete and consistent result set, there is always a chance that you won't be able to collocate all the data. Thus, you may not be able to execute the whole range of SQL queries that are needed to satisfy your use case.

The non-collocated distributed joins have been designed and supported by Apache Ignite for cases when it's extremely difficult or impossible to collocate all the data but you still need to execute a number of SQL queries over non-collocated caches.

Do not overuse the non-collocated distributed joins based approach in practice because the performance of this type of joins is worse then the performance of the affinity collocation based joins due to the fact that there will be much more network round-trips and data movement between the nodes to fulfill a query.

When the non-collocated distributed joins setting is enabled for a specific SQL query with the SqlQuery.setDistributedJoins(boolean) parameter, then, the node to which the query was mapped will request for the missing data (that is not present locally) from the remote nodes by sending either broadcast or unicast requests. This is depicted on Picture 2. below as a potential data movement step (D(Q)). The potential unicast requests are only sent in cases when a join is done on a primary key (cache key) or an affinity key, since the node performing the join knows the location of the missing data. The broadcast requests are sent in all the other cases.

Picture 2. Non-collocated SQL Query

Picture 2. Non-collocated SQL Query

Neither broadcast nor unicast requests, that are sent by one node to another in order to get the missing data, are executed sequentially. The SQL engine combines all the request into batches. This batch size can be managed using SqlQuery.setPageSize(int) parameter.

The following code snippet is provided from the CacheQueryExample included in the Ignite distribution.

IgniteCache<Long, Person> cache = ignite.cache("personCache");

// SQL clause query with join over non-collocated data.
String joinSql =
	"from Person, \"orgCache\".Organization as org " +
  "where Person.orgId = org.id " +
  "and lower(org.name) = lower(?)";

SqlQuery qry = new SqlQuery<Long, Person>(Person.class, joinSql).setArgs("ApacheIgnite");

// Enable distributed joins for the query.
qry.setDistributedJoins(true);

// Execute the query to find out employees for specified organization.
System.out.println("Following people are 'ApacheIgnite' employees (distributed join): ", cache.query(qry).getAll());

Refer to the non-collocated distributed joins blog post for more technical details.

Querying Replicated Caches

If a SQL query is executed over the data stored across replicated caches only, then you may want to set the SqlQuery.setReplicatedOnly(...) parameter to true. This is a special hint to the SQL engine that might produce a more effective execution plan for the query.

Known Limitations

Transactional SQL

Presently, SQL queries support the atomic mode only meaning that if there is a transaction that has already committed value A but that is still committing value B then an SQL query, running in parallel, will see A but will not see B.

Multiversion Concurrence Control (MVCC)

Once Apache Ignite SQL Grid is empowered with MVCC, the SQL Grid will support transactional mode as well. MVCC development is tracked in this JIRA ticket.

Example

A complete example that demonstrates the usage distributed queries, covered under this documentation section, is delivered as a part of every Apache Ignite distribution and named CacheQueryExample. The example is available in Git Hub as well.

Distributed Queries