Apache Ignite

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Checkpointing

Overview

Checkpointing provides an ability to save an intermediate job state. It can be useful when long running jobs need to store some intermediate state to protect from node failures. Then on restart of a failed node, a job would load the saved checkpoint and continue from where it left off. The only requirement for job checkpoint state is to implement java.io.Serializable interface.

Checkpoints are available through the following methods on GridTaskSession interface:

  • ComputeTaskSession.loadCheckpoint(String)
  • ComputeTaskSession.removeCheckpoint(String)
  • ComputeTaskSession.saveCheckpoint(String, Object)

@ComputeTaskSessionFullSupport annotation

Note that checkpointing is disabled by default for performance reasons. To enable it attach @ComputeTaskSessionFullSupport annotation to the task or closure class.

Master Node Failure Protection

One important use case for checkpoint that is not readily apparent is to guard against failure of the "master" node - the node that started the original execution. When master node fails, Ignite wouldn't know where to send the results of the job execution, and thus the result will be discarded.

To failover this scenario, one can store the final result of the job execution as a checkpoint and have the logic re-run the entire task in case of a "master" node failure. In such case the task re-run will be much faster since all jobs can start from the saved checkpoints.

Setting Checkpoints

Every compute job can periodically checkpoint itself by calling the ComputeTaskSession.saveCheckpoint(...) method.

If job did save a checkpoint, then upon beginning of its execution, it should check if the checkpoint is available and start executing from the last saved checkpoint.

IgniteCompute compute = ignite.compute();

compute.run(new CheckpointsRunnable());
  
/**
 * Note that this class is annotated with @ComputeTaskSessionFullSupport
 * annotation to enable checkpointing.
 */
@ComputeTaskSessionFullSupport
private static class CheckpointsRunnable implements IgniteCallable<Object> {
  // Task session (injected on closure instantiation).
  @TaskSessionResource
  private ComputeTaskSession ses;

  @Override 
  public Object call() throws GridException {
    // Try to retrieve step1 result.
    Object res1 = ses.loadCheckpoint("STEP1");

    if (res1 == null) {
      res1 = computeStep1(); // Do some computation.

      // Save step1 result.
      ses.saveCheckpoint("STEP1", res1);
    }

    // Try to retrieve step2 result.
    Object res2 = ses.loadCheckpoint("STEP2");

    if (res2 == null) {
      res2 = computeStep2(res1); // Do some computation.

      // Save step2 result.
      ses.saveCheckpoint("STEP2", res2);
    }

    ...
  }
}

CheckpointSpi

In Ignite, checkpointing functionality is provided by CheckpointSpi which has the following out-of-the-box implementations:

Class
Description

This implementation uses a shared file system to store checkpoints.

This implementation uses a cache to store checkpoints.

This implementation uses a database to store checkpoints.

This implementation uses Amazon S3 to store checkpoints.

CheckpointSpi is provided in IgniteConfiguration and passed into Ignition class at startup. By default, no-op checkpoint SPI is used.

File System Checkpoint Configuration

The following configuration parameters can be used to configure SharedFsCheckpointSpi:

Setter Method
Description
Default

setDirectoryPaths(Collection)

Sets directory paths to the shared folders where checkpoints are stored. The path can either be absolute or relative to the path specified in IGNITE_HOME environment or system varialble.

IGNITE_HOME/work/cp/sharedfs

<bean class="org.apache.ignite.IgniteConfiguration" singleton="true">
  ...
  <property name="checkpointSpi">
    <bean class="org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi">
    <!-- Change to shared directory path in your environment. -->
      <property name="directoryPaths">
        <list>
          <value>/my/directory/path</value>
          <value>/other/directory/path</value>
        </list>
      </property>
    </bean>
  </property>
  ...
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();
 
SharedFsCheckpointSpi checkpointSpi = new SharedFsCheckpointSpi();
 
// List of checkpoint directories where all files are stored.
Collection<String> dirPaths = new ArrayList<String>();
 
dirPaths.add("/my/directory/path");
dirPaths.add("/other/directory/path");
 
// Override default directory path.
checkpointSpi.setDirectoryPaths(dirPaths);
 
// Override default checkpoint SPI.
cfg.setCheckpointSpi(checkpointSpi);
 
// Starts Ignite node.
Ignition.start(cfg);

Cache Checkpoint Configuration

CacheCheckpointSpi is a cache-based implementation for checkpoint SPI. Checkpoint data will be stored in the Ignite data grid in a pre-configured cache.

The following configuration parameters can be used to configure CacheCheckpointSpi:

Setter Method
Description
Default

setCacheName(String)

Sets cache name to use for storing checkpoints.

checkpoints

Database Checkpoint Configuration

JdbcCheckpointSpi uses database to store checkpoints. All checkpoints are stored in the database table and are available from all nodes in the grid. Note that every node must have access to the database. A job state can be saved on one node and loaded on another (e.g., if a job gets preempted on a different node after node failure).

The following configuration parameters can be used to configure JdbcCheckpointSpi (all are optional):

Setter Method
Description
Default

setDataSource(DataSource)

Sets DataSource to use for database access.

No value

setCheckpointTableName(String)

Sets checkpoint table name.

CHECKPOINTS

setKeyFieldName(String)

Sets checkpoint key field name.

NAME

setKeyFieldType(String)

Sets checkpoint key field type. The field should have corresponding SQL string type (VARCHAR , for example).

VARCHAR(256)

setValueFieldName(String)

Sets checkpoint value field name.

VALUE

setValueFieldType(String)

Sets checkpoint value field type. Note, that the field should have corresponding SQL BLOB type. The default value is BLOB, won’t work for all databases. For example, if using HSQL DB, then the type should be longvarbinary.

BLOB

setExpireDateFieldName(String)

Sets checkpoint expiration date field name.

EXPIRE_DATE

setExpireDateFieldType(String)

Sets checkpoint expiration date field type. The field should have corresponding SQL DATETIME type.

DATETIME

setNumberOfRetries(int)

Sets number of retries in case of any database errors.

2

setUser(String)

Sets checkpoint database user name. Note that authentication will be performed only if both, user and password are set.

No value

setPwd(String)

Sets checkpoint database password.

No value

Apache DBCP

Apache DBCP project provides various wrappers for data sources and connection pools. You can use these wrappers as Spring beans to configure this SPI from Spring configuration file or code. Refer to Apache DBCP project for more information.

<bean class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
  ...
  <property name="checkpointSpi">
    <bean class="org.apache.ignite.spi.checkpoint.jdbc.JdbcCheckpointSpi">
      <property name="dataSource">
        <ref bean="anyPoolledDataSourceBean"/>
      </property>
      <property name="checkpointTableName" value="CHECKPOINTS"/>
      <property name="user" value="test"/>
      <property name="pwd" value="test"/>
    </bean>
  </property>
  ...
</bean>
JdbcCheckpointSpi checkpointSpi = new JdbcCheckpointSpi();
 
javax.sql.DataSource ds = ... // Set datasource.
 
// Set database checkpoint SPI parameters.
checkpointSpi.setDataSource(ds);
checkpointSpi.setUser("test");
checkpointSpi.setPwd("test");
 
IgniteConfiguration cfg = new IgniteConfiguration();
 
// Override default checkpoint SPI.
cfg.setCheckpointSpi(checkpointSpi);
 
// Start Ignite node.
Ignition.start(cfg);

Amazon S3 Checkpoint Configuration

S3CheckpointSpi uses Amazon S3 storage to store checkpoints. For information about Amazon S3 visit http://aws.amazon.com/.

The following configuration parameters can be used to configure S3CheckpointSpi:

Setter Method
Description
Default

setAwsCredentials(AWSCredentials)

Sets AWS credentials to use for storing checkpoints.

No value (must be provided)

setClientConfiguration(Client)

Sets AWS client configuration.

No value

setBucketNameSuffix(String)

Sets bucket name suffix.

default-bucket

<bean class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
  ...
  <property name="checkpointSpi">
    <bean class="org.apache.ignite.spi.checkpoint.s3.S3CheckpointSpi">
      <property name="awsCredentials">
        <bean class="com.amazonaws.auth.BasicAWSCredentials">
          <constructor-arg value="YOUR_ACCESS_KEY_ID" />
          <constructor-arg value="YOUR_SECRET_ACCESS_KEY" />
        </bean>
      </property>
    </bean>
  </property>
  ...
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();
 
S3CheckpointSpi spi = new S3CheckpointSpi();
 
AWSCredentials cred = new BasicAWSCredentials(YOUR_ACCESS_KEY_ID, YOUR_SECRET_ACCESS_KEY);
 
spi.setAwsCredentials(cred);
 
spi.setBucketNameSuffix("checkpoints");
 
// Override default checkpoint SPI.
cfg.setCheckpointSpi(cpSpi);
 
// Start Ignite node.
Ignition.start(cfg);

Checkpointing