Lookback

The Lookback feature in Fluvio SmartModule is the way to access records from the data stream before the SmartModule starts allowing it to build its internal state depending on what the topic currently has.

If configured, the Lookback phase is guaranteed to take place after init method but before the first record is processed.

To activate Lookback SmartModule must fit the following criteria:

  • A function annotated with #[smartmodule(look_back)] macro is present in the code. This function is used to pass requested records to SmartModule:
#[smartmodule(look_back)]
pub fn look_back(record: &SmartModuleRecord) -> Result<()> {
    ...
}
  • lookback parameter is specified in transform configuration. It defines a set of lookback records that SmartModule receives. Supported ways to define the set:
    • by size (last N records existing in the topic)
    • by age (records that are younger than the specified age)
    • by size and age (max N records younger than the specified age)
transforms:
  - uses: local/filter-with-lookback@0.1.0
    lookback:
      age: 30m # we want only records that are younger than 30 minutes
      last: 10 # we want maximum of 10 records (last)

If Fluvio topic is empty, look_back is never called.

 

Monotonically increasing values

This is an example of SmartModule that leverages Lookback functionality. It reads the last record from a topic and only allows a record that is higher than the previous value.

use std::sync::atomic::{AtomicI32, Ordering::SeqCst};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

static PREV: AtomicI32 = AtomicI32::new(0);

#[smartmodule(filter)]
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
   let string = std::str::from_utf8(record.value.as_ref())?;
   let current: i32 = string.parse()?;
   let last = PREV.load(SeqCst);
   if current > last {
       PREV.store(current, SeqCst);
       Ok(true)
   } else {
       Ok(false)
   }
}

#[smartmodule(look_back)]
pub fn look_back(record: &SmartModuleRecord) -> Result<()> {
   let string = std::str::from_utf8(record.value.as_ref())?;
   let last: i32 = string.parse()?;
   PREV.store(last, SeqCst);
   Ok(())
}

Using this transforms.yaml config we define that we only need one last record:

transforms:
 - uses: local/filter-with-lookback@0.1.0
   lookback:
     last: 1

 

Test with Fluvio Cli

First, let’s generate a new SmartModule using SMDK tool:

$ smdk generate
Using hub https://hub-dev.infinyon.cloud
🤷   Project Name: filter-with-lookback
✔ 🤷   Will your SmartModule be public? · false
✔ 🤷   Which type of SmartModule would you like? · filter
✔ 🤷   Will your SmartModule use init parameters? · false
[1/7]   Done: .gitignore
[2/7]   Done: Cargo.toml
[3/7]   Done: README.md
[4/7]   Done: SmartModule.toml
[5/7]   Done: rust-toolchain.toml
[6/7]   Done: src/lib.rs
[7/7]   Done: src

Then, put the code snippet from above into src/lib.rs file.

Now we are ready to build and load SmartModule to the cluster:

$ smdk build
$ smdk load

Let’s produce 3 records into our topic:

$ fluvio produce test_topic
> 1
Ok!
> 2
Ok!
> 3
Ok!

In another terminal, run Fluvio Consumer with transforms.yaml file from the example above. It will use our SmartModule with configured lookback:

$ fluvio consume test_topic  --transforms-file transforms.yaml

If you insert another record in Producer, Consumer will only output it only if it’s greater than 3 (last record value when Consumer started).

 

Test with SMDK tool

We will use smdk test subcommand to verify that our SmartModule works.

By --lookback-last 1 we specify lookback parameter to “read last 1 record”.
By --record "N" we specify records that will exist before the SmartModule start processing.

The following commands will pass records “2” and “3” to look_back and record “4” to filter function:

$ smdk test --text 4 --lookback-last 1 --record 2 --record 3
4

The output result is 4. But if we run:

$ smdk test --text 1 --lookback-last 1 --record 2 --record 3

the record will be filtered out as expected.