Apache Ignite

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Persistent Store

Write-through or read-through data to and from persistent storage.

Overview

JCache specification comes with APIs for javax.cache.integration.CacheLoader and javax.cache.integration.CacheWriter which are used for write-through and read-through to and from an underlying persistent storage respectively (e.g. an RDBMS database like Oracle or MySQL, or NoSQL database like MongoDB or Couchbase).

While Ignite allows you to configure the CacheLoader and CacheWriter separately, it is very awkward to implement a transactional store within 2 separate classes, as multiple load and put operations have to share the same connection within the same transaction. To mitigate that, Ignite provides org.apache.ignite.cache.store.CacheStore interface which extends both, CacheLoader and CacheWriter.

Transactions

CacheStore is fully transactional and automatically merges into the ongoing cache transaction.

CacheJdbcPojoStore

Ignite ships with its own CacheJdbcPojoStore which automatically maps Java POJOs to database schema.

Read-Through and Write-Through

Providing a proper cache store implementation is important whenever read-through or write-through behavior is desired. Read-through means that the data will be read from the underlying persistent store whenever it’s not available in cache, and write-through means that the data will be automatically persisted whenever it is updated in cache. All read-through and write-through operations will participate in overall cache transactions and will be committed or rolled back as a whole.

To configure read-through and write-through, you need to implement the CacheStore interface and set cacheStoreFactory as well as readThrough and writeThrough properties of CacheConfiguration, as shown in the examples below.

Write-Behind Caching

In a simple write-through mode, each cache put and remove operation will involve a corresponding request to the persistent store and therefore the overall duration of the cache update might be relatively long. Additionally, an intensive cache update rate can cause an extremely high storage load.

For such cases, Ignite offers an option to perform an asynchronous persistent store update also known as write-behind. The key concept of this approach is to accumulate updates and then asynchronously flush them to the underlying database as a bulk operation. The actual data persistence can be triggered by time-based events (the maximum time that data entry can reside in the queue is limited), by queue-size events (the queue is flushed when it’s size reaches some particular point), or by using both of them in combination in which case either event will trigger the flush.

Update Sequence

With the write-behind approach, only the last update to an entry will be written to the underlying storage. If a cache entry with key key1 is sequentially updated with values value1, value2, and value3 respectively, then only a single store request for (key1, value3) pair will be propagated to the persistent store.

Update Performance

Batch store operations are usually more efficient than a sequence of single store operations. One can exploit this feature by enabling batch operations in the write-behind mode. Update sequences of similar types (put or remove) can be grouped to a single batch. For example, sequential cache puts of (key1, value1), (key2, value2), (key3, value3) will be batched into a single CacheStore.putAll(...) operation.

Write-behind caching can be enabled via the CacheConfiguration.setWriteBehindEnabled(boolean) configuration property. See configuration section below for a full list of configuration properties that allow to customize the behavior of write-behind caching.

CacheStore

CacheStore interface in Ignite is used to write and load data to and from the underlying data store. In addition to the standard JCache loading and storing methods, it also introduces end-of-transaction demarcation and the ability to bulk load a cache from the database.

loadCache()

CacheStore.loadCache() method allows for cache loading without passing all the keys that need to be loaded. It is generally used for hot-loading the cache on startup, but can be also called at any point after the cache has been started.

IgniteCache.loadCache() method will delegate to CacheStore.loadCache() method on every cluster member that is running the cache. To invoke loading only on the local cluster node, use the IgniteCache.localLoadCache() method.

In case of partitioned caches, keys that are not mapped to this node, either as primary or backups, will be automatically discarded by the cache.

load(), write(), delete()

Methods load(), write(), and delete() in the CacheStore are called whenever methods get(), put(), and remove() are called correspondingly on the IgniteCache interface. These methods are used to enable read-through and write-through behavior when working with individual cache entries.

loadAll(), writeAll(), deleteAll()

Methods loadAll(), writeAll(), and deleteAll() in the CacheStore are called whenever methods getAll(), putAll(), and removeAll() are called correspondingly on the IgniteCache interface. These methods are used to enable read-through and write-through behavior when working with multiple cache entries and should generally be implemented using batch operations to provide better performance.

CacheStoreAdapter provides default implementation for loadAll(), writeAll(), and deleteAll() methods which simply iterates through all keys one by one.

sessionEnd()

Ignite has a concept of store session which may span more than one cache store operation. Sessions are especially useful when working with transactions.

In case of ATOMIC caches, method sessionEnd() is called after completion of each CacheStore method. In case of TRANSACTIONAL caches, sessionEnd() is called at the end of each transaction, which allows to either commit or rollback multiple operations on the underlying persistent store.

CacheStoreAdapater provides default empty implementation of sessionEnd() method.

Cassandra Cache Store

Ignite provides out of the box integration with Apache Cassandra that is used as a CacheStore at the level of the in-memory data grid. To learn more about the integration refer to the following documentation section.

CacheStoreSession

The main purpose of cache store session is to hold the context between multiple store invocations whenever CacheStore is used in a cache transaction. For example, if using JDBC, you can store the ongoing database connection via the CacheStoreSession.attach() method. You can then commit this connection in the CacheStore#sessionEnd(boolean) method.

CacheStoreSession can be injected into your cache store implementation via the @GridCacheStoreSessionResource annotation.

CacheStore Example

Below are a couple of different possible cache store implementations. Note that transactional implementation works with and without transactions.

public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  @Override public Person load(Long key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
        st.setLong(1, key);

        ResultSet rs = st.executeQuery();

        return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load: " + key, e);
    }
  }

  // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  @Override public void write(Cache.Entry<Long, Person> entry) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();
          
          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());
          
          st.executeUpdate();
        }
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
    }
  }

  // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  @Override public void delete(Object key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        st.setLong(1, (Long)key);

        st.executeUpdate();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to delete: " + key, e);
    }
  }

  // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  // methods are called on IgniteCache. It is used for bulk-loading the cache.
  // If you don't need to bulk-load the cache, skip this method.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    if (args == null || args.length == 0 || args[0] == null)
      throw new CacheLoaderException("Expected entry count parameter is not provided.");

    final int entryCnt = (Integer)args[0];

    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
        try (ResultSet rs = st.executeQuery()) {
          int cnt = 0;

          while (cnt < entryCnt && rs.next()) {
            Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

            clo.apply(person.getId(), person);

            cnt++;
          }
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }

  // Open JDBC connection.
  private Connection connection() throws SQLException  {
    // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
    // In this example we use H2 Database for simplification.
    Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");

    conn.setAutoCommit(true);

    return conn;
  }
}
public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  /** Auto-injected store session. */
  @CacheStoreSessionResource
  private CacheStoreSession ses;

  // Complete transaction or simply close connection if there is no transaction.
  @Override public void sessionEnd(boolean commit) {
    try (Connection conn = ses.getAttached()) {
      if (conn != null && ses.isWithinTransaction()) {
        if (commit)
          conn.commit();
        else
          conn.rollback();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to end store session.", e);
    }
  }

  // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  @Override public Person load(Long key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
        st.setLong(1, key);

        ResultSet rs = st.executeQuery();

        return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load: " + key, e);
    }
  }

  // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  @Override public void write(Cache.Entry<Long, Person> entry) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();
          
          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());
          
          st.executeUpdate();
        }
      }
    }        
    catch (SQLException e) {
      throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
    }
  }

  // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  @Override public void delete(Object key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        st.setLong(1, (Long)key);

        st.executeUpdate();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to delete: " + key, e);
    }
  }

  // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  // methods are called on IgniteCache. It is used for bulk-loading the cache.
  // If you don't need to bulk-load the cache, skip this method.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    if (args == null || args.length == 0 || args[0] == null)
      throw new CacheLoaderException("Expected entry count parameter is not provided.");

    final int entryCnt = (Integer)args[0];

    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
        try (ResultSet rs = st.executeQuery()) {
          int cnt = 0;

          while (cnt < entryCnt && rs.next()) {
            Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

            clo.apply(person.getId(), person);

            cnt++;
          }
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }

  // Opens JDBC connection and attaches it to the ongoing
  // session if within a transaction.
  private Connection connection() throws SQLException  {
    if (ses.isWithinTransaction()) {
      Connection conn = ses.getAttached();

      if (conn == null) {
        conn = openConnection(false);

        // Store connection in the session, so it can be accessed
        // for other operations within the same transaction.
        ses.attach(conn);
      }

      return conn;
    }
    // Transaction can be null in case of simple load or put operation.
    else
      return openConnection(true);
  }

  // Opens JDBC connection.
  private Connection openConnection(boolean autocommit) throws SQLException {
    // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
    // In this example we use H2 Database for simplification.
    Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");

    conn.setAutoCommit(autocommit);

    return conn;
  }
}
public class CacheJdbcPersonStore extends CacheStore<Long, Person> {
  // Skip single operations and open connection methods.
  // You can copy them from jdbc non-transactional or jdbc transactional examples.
  ...
  
  // This mehtod is called whenever "getAll(...)" methods are called on IgniteCache.
  @Override public Map<K, V> loadAll(Iterable<Long> keys) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement(
        "select firstName, lastName from PERSONS where id=?")) {
        Map<K, V> loaded = new HashMap<>();
        
        for (Long key : keys) {
          st.setLong(1, key);
          
          try(ResultSet rs = st.executeQuery()) {
            if (rs.next())
              loaded.put(key, new Person(key, rs.getString(1), rs.getString(2));
          }
        }

        return loaded;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to loadAll: " + keys, e);
    }
  }
  
  // This mehtod is called whenever "putAll(...)" methods are called on IgniteCache.
  @Override public void writeAll(Collection<Cache.Entry<Long, Person>> entries) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();
          
          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());
          
          st.addBatch();
        }
        
				st.executeBatch();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to writeAll: " + entries, e);
    }
  }
  
  // This mehtod is called whenever "removeAll(...)" methods are called on IgniteCache.
  @Override public void deleteAll(Collection<Long> keys) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        for (Long key : keys) {
          st.setLong(1, key);
          
          st.addBatch();
        }
        
				st.executeBatch();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to deleteAll: " + keys, e);
    }
  }
}

Configuration

The following configuration parameters can be used to enable and configure write-behind caching via CacheConfiguration:

Setter Method
Description
Default

setWriteBehindEnabled(boolean)

Sets flag indicating whether write-behind is enabled.

false

setWriteBehindFlushSize(int)

Maximum size of the write-behind cache. If cache size exceeds this value, all cached items are flushed to the cache store and write cache is cleared. If this value is 0, then flush is performed according to the flush frequency interval. Note that you cannot set both, flush size and flush frequency, to 0.

10240

setWriteBehindFlushFrequency(long)

Frequency with which write-behind cache is flushed to the cache store in milliseconds. This value defines the maximum time interval between object insertion/deletion from the cache and the moment when corresponding operation is applied to the cache store. If this value is 0, then flush is performed according to the flush size. Note that you cannot set both, flush size and flush frequency, to 0.

5000 milliseconds

setWriteBehindFlushThreadCount(int)

Number of threads that will perform cache flushing.

1

setWriteBehindBatchSize(int)

Maximum batch size for write-behind cache store operations.

512

CacheStore interface can be set on IgniteConfiguration via a Factory in much the same way like CacheLoader and CacheWriter are being set.

For distributed cache configuration, Factory should be serializable.

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          ...
          <property name="cacheStoreFactory">
    				<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
        			<constructor-arg value="foo.bar.MyPersonStore"/>
    				</bean>
					</property>
    			<property name="readThrough" value="true"/>
          <property name="writeThrough"  value="true"/>
    		</bean>
    	</list>
    </property>
  ...
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();

CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();

cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyPersonStore.class));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

CacheJdbcBlobStore

CacheJdbcBlobStore implementation is backed by JDBC. This implementation stores objects in the underlying database in the BLOB format. The Store will create table ENTRIES in the database to store the data. The table will have key and val fields.
If custom DDL and DML statements are provided, then table and field names should be consistent for all the statements, and sequence of parameters should be preserved.

Use the CacheJdbcBlobStoreFactory factory to pass CacheJdbcBlobStore to CacheConfiguration.

<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
  
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
   <property name="cacheConfiguration">
     <list>
       <bean class="org.apache.ignite.configuration.CacheConfiguration">
         ...
           <property name="cacheStoreFactory">
             <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
               <property name="user" value = "user" />
               <property name="dataSourceBean" value = "simpleDataSource" />
             </bean>
           </property>
       </bean>
      </list>
    </property>
  ...
</bean>

CacheJdbcPojoStore

CacheJdbcPojoStore of CacheStore is backed by JDBC and POJO via reflection. This implementation stores objects in the underlying database using java beans mapping description via reflection.

Use the CacheJdbcPojoStoreFactory factory to pass CacheJdbcPojoStore to CacheConfiguration.

<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
  
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          ...
            <property name="cacheStoreFactory">
              <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
                <property name="dataSourceBean" value = "simpleDataSource" />
              </bean>
            </property>
        </bean>
      </list>
    </property>
</bean>

CacheHibernateBlobStore

CacheHibernateBlobStore implementation is backed by Hibernate. This implementation
stores objects in the underlying database in BLOB format.

Use the CacheHibernateBlobStoreFactory factory to pass CacheHibernateBlobStore to CacheConfiguration.

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <bean class="org.apache.ignite.cache.store.hibernate.CacheHibernateBlobStoreFactory">
           <property name="hibernateProperties">
             <props>
               <prop key="connection.url">jdbc:h2:mem:</prop>
               <prop key="hbm2ddl.auto">update</prop>
               <prop key="show_sql">true</prop>
             </props>
           </property>
         </bean>
       </list>
    </property>
  ...    
</bean>

Cassandra Cache Store

Refer to Cassandra Cache Store documentation to see how use Cassandra as an Apache Ignite persistent store.

Persistent Store

Write-through or read-through data to and from persistent storage.