Apache Ignite

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

MapReduce & ForkJoin

Execute MapReduce and ForkJoin tasks in memory.

Overview

ComputeTask is the Ignite abstraction for the simplified in-memory MapReduce, which is also very close to ForkJoin paradigm. Pure MapReduce was never built for performance and only works well when dealing with off-line batch oriented processing (e.g. Hadoop MapReduce). However, when computing on data that resides in-memory, real-time low latencies and high throughput usually take the highest priority. Also, simplicity of the API becomes very important as well. With that in mind, Ignite introduced the ComputeTask API, which is a light-weight MapReduce (or ForkJoin) implementation.

Use ComputeTask only when you need fine-grained control over the job-to-node mapping, or custom fail-over logic. For all other cases you should use simple closure executions on the cluster documented in Distributed Closures section.

ComputeTask

ComputeTask defines jobs to execute on the cluster, and the mappings of those jobs to nodes. It also defines how to process (reduce) the job results. All IgniteCompute.execute(...) methods execute the given task on the grid. User applications should implement map(...) and reduce(...) methods of ComputeTask interface.

Tasks are defined by implementing the 2 or 3 methods on ComputeTask interface

Map Method

Method map(...) instantiates the jobs and maps them to worker nodes. The method receives the collection of cluster nodes on which the task is run and the task argument. The method should return a map with jobs as keys and mapped worker nodes as values. The jobs are then sent to the mapped nodes and executed there.

Refer to ComputeTaskSplitAdapter for simplified implementation of the map(...) method.

Result Method

Method result(...) is called each time a job completes on some cluster node. It receives the result returned by the completed job, as well as the list of all the job results received so far. The method should return a ComputeJobResultPolicy instance, indicating what to do next:

  • WAIT - wait for all remaining jobs to complete (if any)
  • REDUCE - immediately move to reduce step, discarding all the remaining jobs and unreceived yet results
  • FAILOVER - failover the job to another node (see Fault Tolerance)
    All the received job results will be available in the reduce(...) method as well.

Reduce Method

Method reduce(...) is called on reduce step, when all the jobs have completed (or REDUCE result policy was returned from the result(...) method). The method receives a list with all the completed results and should return a final result of the computation.

Compute Task Adapters

It is not necessary to implement all 3 methods of the ComputeTask API each time you need to define a computation. There is a number of helper classes that let you describe only a particular piece of your logic, leaving out all the rest to Ignite to handle automatically.

ComputeTaskAdapter

ComputeTaskAdapter defines a default implementation of the result(...) method which returns FAILOVER policy if a job threw an exception and WAIT policy otherwise, thus waiting for all jobs to finish with a result.

ComputeTaskSplitAdapter

ComputeTaskSplitAdapter extends ComputeTaskAdapter and adds capability to automatically assign jobs to nodes. It hides the map(...) method and adds a new split(...) method in which user only needs to provide a collection of the jobs to be executed (the mapping of those jobs to nodes will be handled automatically by the adapter in a load-balanced fashion).

This adapter is especially useful in homogeneous environments where all nodes are equally suitable for executing jobs and the mapping step can be done implicitly.

ComputeJob

All jobs that are spawned by a task are implementations of the ComputeJob interface. The execute() method of this interface defines the job logic and should return a job result. The cancel() method defines the logic in case if the job is discarded (for example, in case when task decides to reduce immediately or to cancel).

ComputeJobAdapter

Convenience adapter which provides a no-op implementation of the cancel() method.

Example

Here is an example of ComputeTask and ComputeJob implementations.

IgniteCompute compute = ignite.compute();

// Execute task on the clustr and wait for its completion.
int cnt = compute.execute(CharacterCountTask.class, "Hello Grid Enabled World!");
 
System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'.");
 
/**
 * Task to count non-white-space characters in a phrase.
 */
private static class CharacterCountTask extends ComputeTaskSplitAdapter<String, Integer> {
  // 1. Splits the received string into to words
  // 2. Creates a child job for each word
  // 3. Sends created jobs to other nodes for processing. 
  @Override 
  public List<ClusterNode> split(int gridSize, String arg) {
    String[] words = arg.split(" ");

    List<ComputeJob> jobs = new ArrayList<>(words.length);

    for (final String word : arg.split(" ")) {
      jobs.add(new ComputeJobAdapter() {
        @Override public Object execute() {
          System.out.println(">>> Printing '" + word + "' on from compute job.");

          // Return number of letters in the word.
          return word.length();
        }
      });
    }

    return jobs;
  }

  @Override 
  public Integer reduce(List<ComputeJobResult> results) {
    int sum = 0;

    for (ComputeJobResult res : results)
      sum += res.<Integer>getData();

    return sum;
  }
}
IgniteCompute compute = ignite.compute();

// Execute task on the clustr and wait for its completion.
int cnt = grid.compute().execute(CharacterCountTask.class, "Hello Grid Enabled World!");
 
System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'.");
 
/**
 * Task to count non-white-space characters in a phrase.
 */
private static class CharacterCountTask extends ComputeTaskAdapter<String, Integer> {
    // 1. Splits the received string into to words
    // 2. Creates a child job for each word
    // 3. Sends created jobs to other nodes for processing. 
    @Override 
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) {
        String[] words = arg.split(" ");
      
        Map<ComputeJob, ClusterNode> map = new HashMap<>(words.length);
        
        Iterator<ClusterNode> it = subgrid.iterator();
         
        for (final String word : arg.split(" ")) {
            // If we used all nodes, restart the iterator.
            if (!it.hasNext())
                it = subgrid.iterator();
             
            ClusterNode node = it.next();
                
            map.put(new ComputeJobAdapter() {
                @Override public Object execute() {
                    System.out.println(">>> Printing '" + word + "' on this node from grid job.");
                  
                    // Return number of letters in the word.
                    return word.length();
                }
             }, node);
        }
      
        return map;
    }
 
    @Override 
    public Integer reduce(List<ComputeJobResult> results) {
        int sum = 0;
      
        for (ComputeJobResult res : results)
            sum += res.<Integer>getData();
      
        return sum;
    }
}

Distributed Task Session

Distributed task session is created for every task execution. It is defined by ComputeTaskSession interface. Task session is visible to the task and all the jobs spawned by it, so attributes set on a task or on a job can be accessed on other jobs. Task session also allows to receive notifications when attributes are set or wait for an attribute to be set.

The sequence in which session attributes are set is consistent across the task and all job siblings within it. There will never be a case when one job sees attribute A before attribute B, and another job sees attribute B before A.

In the example below, we have all jobs synchronize on STEP1 before moving on to STEP2.

@ComputeTaskSessionFullSupport annotation

Note that distributed task session attributes are disabled by default for performance reasons. To enable them attach @ComputeTaskSessionFullSupport annotation to the task class.

IgniteCompute compute = ignite.commpute();

compute.execute(new TaskSessionAttributesTask(), null);

/**
 * Task demonstrating distributed task session attributes.
 * Note that task session attributes are enabled only if
 * @ComputeTaskSessionFullSupport annotation is attached.
 */
@ComputeTaskSessionFullSupport
private static class TaskSessionAttributesTask extends ComputeTaskSplitAdapter<Object, Object>() {
  @Override 
  protected Collection<? extends GridJob> split(int gridSize, Object arg)  {
    Collection<ComputeJob> jobs = new LinkedList<>();

    // Generate jobs by number of nodes in the grid.
    for (int i = 0; i < gridSize; i++) {
      jobs.add(new ComputeJobAdapter(arg) {
        // Auto-injected task session.
        @TaskSessionResource
        private ComputeTaskSession ses;
        
        // Auto-injected job context.
        @JobContextResource
        private ComputeJobContext jobCtx;

        @Override 
        public Object execute() {
          // Perform STEP1.
          ...
          
          // Tell other jobs that STEP1 is complete.
          ses.setAttribute(jobCtx.getJobId(), "STEP1");
          
          // Wait for other jobs to complete STEP1.
          for (ComputeJobSibling sibling : ses.getJobSiblings())
            ses.waitForAttribute(sibling.getJobId(), "STEP1", 0);
          
          // Move on to STEP2.
          ...
        }
      }
    }
  }
               
  @Override 
  public Object reduce(List<ComputeJobResult> results) {
    // No-op.
    return null;
  }
}

MapReduce & ForkJoin

Execute MapReduce and ForkJoin tasks in memory.