Apache Ignite Documentation

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Continuous Queries

Continuously obtain real-time query results.

Continuous Queries

Continuous queries enable you to listen to data modifications occurring on Ignite caches. Once a continuous query is started, you will get notified of all the data changes that fall into your query filter if any.

Continuous queries functionality is available via ContinuousQuery class that is elaborated below.

Initial Query

Whenever a continuous query is prepared for execution, you have an option to specify an initial query that will be executed before the continuous query gets registered in the cluster and before you start to receive the updates.

The initial query can be set with ContinuousQuery.setInitialQuery(Query) method and can be of any query type: Scan, SQL , or TEXT.

Remote Filter

This filter is executed on primary and backup nodes for a given key and evaluates whether an update should be propagated as an event to the query's local listener.

If the filter returns true, then the local listener will be notified. Otherwise, the notification will be skipped. Updates filtering on specific primary and backup nodes, on which they occur, allows to reduce unnecessary network traffic between primary/backup nodes and local listeners executed on the application side.

A remote filter can be set via the ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>) method.

Local Listener

When a cache gets modified (an entry is inserted, updated or deleted), an event related to the update will be sent the continuous query's local listener so that your application can react accordingly.

Whenever events pass the remote filter, they will be sent to the client to notify the local listener there.
The local listener is set via the ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>) method.

IgniteCache<Integer, String> cache = ignite.cache("mycache");

// Creating a continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

// Setting an optional initial query. 
// The query will return entries for the keys greater than 10.
qry.setInitialQuery(new ScanQuery<Integer, String>((k, v) -> k > 10)):

// Local listener that is called locally when an update notification is received.
qry.setLocalListener((evts) -> 
	evts.stream().forEach(e -> System.out.println("key=" + e.getKey() + ", val=" + e.getValue())));

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the local listener.
qry.setRemoteFilter(e -> e.getKey() > 10);

// Executing the query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterating over existing data stored in cache.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Adding a few more cache entries.
  // As a result, the local listener above will be called.
  for (int i = 5; i < 15; i++)
    cache.put(i, Integer.toString(i));
}
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

// // Creating a continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

// Setting an optional initial query. 
// The query will return entries for the keys greater than 10.
qry.setInitialQuery(new ScanQuery<Integer, String>(
  new IgniteBiPredicate<Integer, String>() {
  @Override public boolean apply(Integer key, String val) {
    return key > 10;
  }
}));

// Local listener that is called locally when an update notification is received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
  @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
    for (CacheEntryEvent<Integer, String> e : evts)
      System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
  }
});

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the local listener.
qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
  @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
    return e.getKey() > 10;
  }
});

// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterating over existing data stored in cache.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Adding a few more cache entries.
  // As a result, the local listener above will be called.
  for (int i = keyCnt; i < keyCnt + 10; i++)
    cache.put(i, Integer.toString(i));
}

Remote Transformer

By default, continuous queries send over the whole updated object to listeners running on the application side. It might lead to excessive network usage, especially, if a transferred object is big enough. Moreover, quite often applications are interested in receiving a subset of the fields of the updated object rather the whole one.

To address these cases, you can use ContinuousQueryWithTransformer that allows setting a custom transformer factory that will be executed on remote nodes for every updated object.
The transformer will send over only results of the final transformation to the listeners.‚Äč

// Create a new continuous query with the transformer.
ContinuousQueryWithTransformer qry = new ContinuousQueryWithTransformer();

// Factory to create transformers.
Factory factory = FactoryBuilder.factoryOf(
    // Return one field of a complex object.
    // Only this field will be sent over to a local listener over the network.
    (IgniteClosure<CacheEntryEvent, String>)
        event -> ((Organization)event.getValue()).name());

qry.setRemoteTransformerFactory(factory);

// Listener that will receive transformed data.
qry.setLocalListener(names -> {
    for (Object name : names)
        System.out.println("New organization name: " + name);
});

Examples of usage of ContinuousQueryWithTransformer can be found on GitHub.

Events Delivery Guarantees

Continuous query implementation guarantees exactly once delivery of an event to the client's local listener.

It's feasible since every backup node(s) maintains an update queue in addition to the primary node. If the primary node crashes or a topology is changed for some other reason, then every backup node flushes the content of its internal queue to the client, making sure that there is no event that is not delivered to the client's local listener.

To avoid duplicate notifications, in cases when all backup nodes flush their queues to the client, Ignite manages a per-partition update counter. Once an entry in some partition is updated, a counter for this partition is incremented on both primary and backups. The value of this counter is also sent along with the event notification to the client, which also maintains the copy of this mapping. If the client receives an update with the counter less than in its local map, this update is treated as a duplicate and discarded.

Once the client confirms that an event is received, the primary and backup nodes remove a record for this event from their backup queues.

Example

A complete example demonstrating the usage of continuous queries is delivered as a part of every Apache Ignite distribution and named CacheContinuousQueryExample. The example is available in GitHub as well.

Continuous Queries

Continuously obtain real-time query results.