Apache Ignite

GridGain Developer Hub - Apache Ignitetm

Welcome to the Apache Ignite developer hub run by GridGain. Here you'll find comprehensive guides and documentation to help you start working with Apache Ignite as quickly as possible, as well as support if you get stuck.

 

GridGain also provides Community Edition which is a distribution of Apache Ignite made available by GridGain. It is the fastest and easiest way to get started with Apache Ignite. The Community Edition is generally more stable than the Apache Ignite release available from the Apache Ignite website and may contain extra bug fixes and features that have not made it yet into the release on the Apache website.

 

Let's jump right in!

 

Documentation     Ask a Question     Download

 

Javadoc     Scaladoc     Examples

Data Streamers

Stream large amounts of data into Ignite caches.

Overview

Data streamers are defined by IgniteDataStreamer API and are built to inject large amounts of continuous streams of data into Ignite caches. Data streamers are built in a scalable and fault-tolerant fashion and provide at-least-once-guarantee semantics for all the data streamed into Ignite.

Data streamers do not participate in Ignite Transactions.

IgniteDataStreamer

The main abstraction for fast streaming of large amounts of data into Ignite is IgniteDataStreamer, which internally will properly batch keys together and collocate those batches with nodes on which the data will be cached.

The high loading speed is achieved with the following techniques:

  • Entries that are mapped to the same cluster member will be batched together in a buffer.
  • Multiple buffers can coexist at the same time.
  • To avoid running out of memory, data streamer has a maximum number of buffers it can process concurrently.

To add data to the data streamer, you should call IgniteDataStreamer.addData(...) method.

// Get the data streamer reference and stream data.
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myStreamCache")) {    
    // Stream entries.
    for (int i = 0; i < 100000; i++)
        stmr.addData(i, Integer.toString(i));
}

Allow Overwrite

By default, the data streamer will not overwrite existing data, which means that if it will encounter an entry that is already in cache, it will skip it. This is the most efficient and performant mode, as the data streamer does not have to worry about data versioning in the background.

If you anticipate that the data may already be in the streaming cache and you need to overwrite it, you should set IgniteDataStreamer.allowOverwrite(true) parameter.

Streamer, Cache Store, and AllowOverwrite

AllowOverwrite property, when false (default), also causes Persistent Store to be skipped, even though SkipStore property is also false.

Cache store is invoked only when AllowOverwrite is true.

StreamReceiver

For cases when you need to execute some custom logic instead of just adding new data, you can take advantage of StreamReceiver API.

Stream receivers allow you to react to the streamed data in collocated fashion, directly on the nodes where it will be cached. You can change the data or add any custom pre-processing logic to it, before putting the data into cache.

Note that StreamReceiver does not put data into cache automatically. You need to call any of the cache.put(...) methods explicitly.

StreamTransformer

StreamTransformer is a convenience implementation of StreamReceiver which updates data in the stream cache based on its previous value. The update is collocated, i.e. it happens exactly on the cluster node where the data is stored.

In the example below, we use StreamTransformer to increment a counter for each distinct word found in the text stream.

CacheConfiguration cfg = new CacheConfiguration("wordCountCache");

IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(cfg);

try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
  // Allow data updates.
  stmr.allowOverwrite(true);

  // Configure data transformation to count instances of the same word.
  stmr.receiver(StreamTransformer.from((e, arg) -> {
    // Get current count.
    Long val = e.getValue();

    // Increment count by 1.
    e.setValue(val == null ? 1L : val + 1);

    return null;
  }));

  // Stream words into the streamer cache.
  for (String word : text)
    stmr.addData(word, 1L);
}
CacheConfiguration cfg = new CacheConfiguration("wordCountCache");

IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg);

try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
  // Allow data updates.
  stmr.allowOverwrite(true);

  // Configure data transformation to count instances of the same word.
  stmr.receiver(new StreamTransformer<String, Long>() {
    @Override public Object process(MutableEntry<String, Long> e, Object... args) {
      // Get current count.
      Long val = e.getValue();

      // Increment count by 1.
      e.setValue(val == null ? 1L : val + 1);

      return null;
    }
  });

  // Stream words into the streamer cache.
  for (String word : text)
    stmr.addData(word, 1L);

StreamVisitor

StreamVisitor is also a convenience implementation of StreamReceiver which visits every key-value tuple in the stream. Note, that the visitor does not update the cache. If the tuple needs to be stored in the cache, then any of the cache.put(...) methods should be called explicitly.

In the example below, we have 2 caches: "marketData", and "instruments". We receive market data ticks and put them into the streamer for the "marketData" cache. The StreamVisitor for the "marketData" streamer is invoked on the cluster member mapped to the particular market symbol. Upon receiving individual market ticks it updates the "instrument" cache with latest market price.

Note, that we do not update "marketData" cache at all, leaving it empty. We simply use for collocated processing of the market data within the cluster directly on the node where the data will be stored.

CacheConfiguration<String, Double> mrktDataCfg = new CacheConfiguration<>("marketData");
CacheConfiguration<String, Double> instCfg = new CacheConfiguration<>("instruments");

// Cache for market data ticks streamed into the system.
IgniteCache<String, Double> mrktData = ignite.getOrCreateCache(mrktDataCfg);

// Cache for financial instruments.
IgniteCache<String, Double> instCache = ignite.getOrCreateCache(instCfg);

try (IgniteDataStreamer<String, Integer> mktStmr = ignite.dataStreamer("marketData")) {
  // Note that we do not populate 'marketData' cache (it remains empty).
  // Instead we update the 'instruments' cache based on the latest market price.
  mktStmr.receiver(StreamVisitor.from((cache, e) -> {
    String symbol = e.getKey();
    Double tick = e.getValue();

    Instrument inst = instCache.get(symbol);

    if (inst == null)
      inst = new Instrument(symbol);

    // Update instrument price based on the latest market tick.
    inst.setHigh(Math.max(inst.getLatest(), tick);
    inst.setLow(Math.min(inst.getLatest(), tick);
    inst.setLatest(tick);

    // Update instrument cache.
    instCache.put(symbol, inst);
  }));

  // Stream market data into Ignite.
  for (Map.Entry<String, Double> tick : marketData)
      mktStmr.addData(tick);
}

Data Streamers

Stream large amounts of data into Ignite caches.