Apache Ignite Documentation

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Continuous Queries

Continuously obtain real-time query results.

Continuous queries are good for cases when you want to execute a query and then continue to get notified about the data changes that fall into your query filter.

Continuous queries are supported via ContinuousQuery class, which supports the following:

Initial Query

Whenever executing continuous query, you have an option to execute initial query before starting to listen to updates. The initial query can be set via ContinuousQuery.setInitialQuery(Query) method and can be of any query type, Scan, SQL, or TEXT. This parameter is optional, and if not set, will not be used.

Remote Filter

This filter is executed on the primary node for a given key and evaluates whether the event should be propagated to the listener. If the filter returns true, then the listener will be notified, otherwise the event will be skipped. Filtering events on the node on which they have occurred allows to minimize unnecessary network traffic for listener notifications. Remote filter can be set via ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>) method.

Local Listener

Whenever events pass the remote filter, they will be send to the client to notify the local listener there. Local listener is set via ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>) method.

IgniteCache<Integer, String> cache = ignite.cache("mycache");

// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

// Optional initial query to select all keys greater than 10.
qry.setInitialQuery(new ScanQuery<Integer, String>((k, v) -> k > 10)):

// Callback that is called locally when update notifications are received.
qry.setLocalListener((evts) -> 
	evts.forEach(e -> System.out.println("key=" + e.getKey() + ", val=" + e.getValue())));

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilter(e -> e.getKey() > 10);

// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterate through existing data stored in cache.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Add a few more keys and watch a few more query notifications.
  for (int i = 5; i < 15; i++)
    cache.put(i, Integer.toString(i));
}
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

qry.setInitialQuery(new ScanQuery<Integer, String>(new IgniteBiPredicate<Integer, String>() {
  @Override public boolean apply(Integer key, String val) {
    return key > 10;
  }
}));

// Callback that is called locally when update notifications are received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
  @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
    for (CacheEntryEvent<Integer, String> e : evts)
      System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
  }
});

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
  @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
    return e.getKey() > 10;
  }
});

// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterate through existing data.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Add a few more keys and watch more query notifications.
  for (int i = keyCnt; i < keyCnt + 10; i++)
    cache.put(i, Integer.toString(i));
}

Continuous Queries


Continuously obtain real-time query results.

Suggested Edits are limited on API Reference Pages

You can only suggest edits to Markdown body content, but not to the API spec.