Apache Ignite

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Distributed Closures

Broadcast and load-balance closure execution across cluster nodes.

Overview

Ignite compute grid allows to broadcast and load-balance any closure within the cluster or a cluster group, including plain Java runnables and callables.

Broadcast Methods

All broadcast(...) methods broadcast a given job to all nodes in the cluster or cluster group.

final Ignite ignite = Ignition.ignite();

// Limit broadcast to remote nodes only.
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());

// Print out hello message on remote nodes in the cluster group.
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));
final Ignite ignite = Ignition.ignite();

// Limit broadcast to remote nodes only and 
// enable asynchronous mode.
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes()).withAsync();

// Print out hello message on remote nodes in the cluster group.
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));

ComputeTaskFuture<?> fut = compute.future():

fut.listen(f -> System.out.println("Finished sending broadcast job."));
final Ignite ignite = Ignition.ignite();

// Limit broadcast to rmeote nodes only.
IgniteCompute compute = ignite.compute(ignite.cluster.forRemotes());

// Print out hello message on remote nodes in projection.
compute.broadcast(
    new IgniteRunnable() {
        @Override public void run() {
            // Print ID of remote node on remote node.
            System.out.println(">>> Hello Node: " + ignite.cluster().localNode().id());
        }
    }
);
final Ignite ignite = Ignition.ignite();

// Limit broadcast to remote nodes only and 
// enable asynchronous mode.
IgniteCompute compute = ignite.compute(ignite.cluster.forRemotes()).withAsync();

// Print out hello message on remote nodes in the cluster group.
compute.broadcast(
    new IgniteRunnable() {
        @Override public void run() {
            // Print ID of remote node on remote node.
            System.out.println(">>> Hello Node: " + ignite.cluster().localNode().id());
        }
    }
);

ComputeTaskFuture<?> fut = compute.future():

fut.listen(new IgniteInClosure<? super ComputeTaskFuture<?>>() {
    public void apply(ComputeTaskFuture<?> fut) {
        System.out.println("Finished sending broadcast job to cluster.");
    }
});

Call and Run Methods

All call(...) and run(...) methods execute either individual jobs or collections of jobs on the cluster or a cluster group.

Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
 
// Iterate through all words in the sentence and create callable jobs.
for (String word : "How many characters".split(" "))
    calls.add(word::length);

// Execute collection of callables on the cluster.
Collection<Integer> res = ignite.compute().call(calls);

// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum(); 
IgniteCompute compute = ignite.compute();

// Iterate through all words and print 
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" "))
    // Run on some cluster node.
    compute.run(() -> System.out.println(word));
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
 
// Iterate through all words in the sentence and create callable jobs.
for (String word : "Count characters using callable".split(" "))
    calls.add(word::length);

// Enable asynchronous mode.
IgniteCompute asyncCompute = ignite.compute().withAsync();

// Asynchronously execute collection of callables on the cluster.
asyncCompute.call(calls);

asyncCompute.future().listen(fut -> {
    // Total number of characters.
    int total = fut.get().stream().mapToInt(Integer::intValue).sum(); 
  
    System.out.println("Total number of characters: " + total);
});
IgniteCompute asyncCompute = ignite.compute().withAsync();

Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();

// Iterate through all words and print 
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" ")) {
    // Asynchronously run on some cluster node.
    asyncCompute.run(() -> System.out.println(word));

    futs.add(asyncCompute.future());
}

// Wait for completion of all futures.
futs.stream().forEach(ComputeTaskFuture::get);
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
 
// Iterate through all words in the sentence and create callable jobs.
for (final String word : "Count characters using callable".split(" ")) {
    calls.add(new IgniteCallable<Integer>() {
        @Override public Integer call() throws Exception {
            return word.length(); // Return word length.
        }
    });
}
 
// Execute collection of callables on the cluster.
Collection<Integer> res = ignite.compute().call(calls);

int total = 0;

// Total number of characters.
// Looks much better in Java 8.
for (Integer i : res)
  total += i;
IgniteCompute asyncCompute = ignite.compute().withAsync();

Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();

// Iterate through all words and print
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" ")) {
    // Asynchronously run on some cluster node.
    asyncCompute.run(new IgniteRunnable() {
        @Override public void run() {
            System.out.println(word);
        }
    });

    futs.add(asyncCompute.future());
}

// Wait for completion of all futures.
for (ComputeTaskFuture<?> f : futs)
  f.get();

Apply Methods

A closure is a block of code that encloses its body and any outside variables used inside of it as a function object. You can then pass such function object anywhere you can pass a variable and execute it. All apply(...) methods execute closures on the cluster.

IgniteCompute compute  = ignite.compute();

// Execute closure on all cluster nodes.
Collection<Integer> res = compute.apply(
    String::length,
    Arrays.asList("How many characters".split(" "))
);
     
// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum(); 
// Enable asynchronous mode.
IgniteCompute asyncCompute = ignite.compute().withAsync();

// Execute closure on all cluster nodes.
// If the number of closures is less than the number of 
// parameters, then Ignite will create as many closures 
// as there are parameters.
Collection<Integer> res = asyncCompute.apply(
    String::length,
    Arrays.asList("How many characters".split(" "))
);
     
asyncCompute.future().listen(fut -> {
    // Total number of characters.
    int total = fut.get().stream().mapToInt(Integer::intValue).sum(); 
  
    System.out.println("Total number of characters: " + total);
});
// Execute closure on all cluster nodes.
Collection<Integer> res = ignite.compute().apply(
    new IgniteClosure<String, Integer>() {
        @Override public Integer apply(String word) {
            // Return number of letters in the word.
            return word.length();
        }
    },
    Arrays.asList("Count characters using closure".split(" "))
);
     
int sum = 0;
 
// Add up individual word lengths received from remote nodes
for (int len : res)
    sum += len;

Distributed Closures

Broadcast and load-balance closure execution across cluster nodes.