Introduction

Motivation

We have tools to "explore" the Cardano blockchain, which are useful when you know what you're looking for. We argue that there's a different, complementary use-case which is to "observe" the blockchain and react to particular event patterns.

Oura is a rust-native implementation of a pipeline that connects to the tip of a Cardano node through a combination of Ouroboros mini-protocol (using either a unix socket or tcp bearer), filters the events that match a particular pattern and then submits a succinct, self-contained payload to pluggable observers called "sinks".

Etymology

The name of the tool is inspired by the tail command available in unix-like systems which is used to display the tail end of a text file or piped data. Cardano's consensus protocol name, Ouroboros, is a reference to the ancient symbol depicting a serpent or dragon eating its own tail, which means "tail eating". "Oura" is the ancient greek word for "tail".

Under the Hood

All the heavy lifting required to communicate with the Cardano node is done by the Pallas library, which provides an implementation of the Ouroboros multiplexer and a few of the required mini-protocol state-machines (ChainSync and LocalState in particular).

The data pipeline makes heavy use (maybe a bit too much) of multi-threading and mpsc channels provided by Rust's std::sync library.

Use Cases

CLI to Watch Live Transactions

You can run oura watch <socket> to print TX data into the terminal from the tip of a local or remote node. It can be useful as a debugging tool for developers or if you're just curious to see what's going on in the network (for example, to see airdrops as they happen or oracles posting new information).

As a Bridge to Other Persistence Mechanisms

Similar to the well-known db-sync tool provided by IOHK, Oura can be used as a daemon to follow a node and output the data into a different data storage technology more suited for your final use-case. The main difference with db-sync is that Oura was designed for easy integration with data-streaming pipelines instead of relational databases.

Given its small memory / cpu footprint, Oura can be deployed side-by-side with your Cardano node even in resource-constrained environments, such as Raspberry PIs.

As a Trigger of Custom Actions

Oura running in daemon mode can be configured to use custom filters to pinpoint particular transaction patterns and trigger actions whenever it finds a match. For example: send an email when a particular policy / asset combination appears in a transaction; call an AWS Lambda function when a wallet delegates to a particular pool; send a http-call to a webhook each time a metadata key appears in the TX payload;

As a Library for Custom Scenarios

If the available out-of-the-box features don't satisfy your particular use-case, Oura can be used a library in your Rust project to set up tailor-made pipelines. Each component (sources, filters, sinks, etc) in Oura aims at being self-contained and reusable. For example, custom filters and sinks can be built while reusing the existing sources.

(Experimental) Windows Support

Oura Windows support is currently experimental, Windows build supports only Node-to-Node source with tcp socket bearer.

Installation

Depending on your needs, Oura provides different installation options:

  • Binary Release: to use one of our pre-compiled binary releases for the supported platforms.
  • From Source: to compile a binary from source code using Rust's toolchain
  • Docker: to run the tool from a pre-built docker image
  • Kubernetes: to deploy Oura as a resource within a Kubernetes cluster

Binary Releases

Oura can be run as a standalone executable. Follow the instructions for your particular OS / Platform to install a local copy from our Github pre-built releases.

MacOS

Use the following command to download and install Oura's binary release for MacOS:

curl --silent --location https://git.io/JD2iH | \
    tar xz -C /tmp && mv /tmp/oura /usr/local/bin

GNU/Linux

Use the following command to download and install Oura's binary release for GNU/Linux:

curl --silent --location https://git.io/JD2ix | \
    tar xz -C /tmp && mv /tmp/oura /usr/local/bin

Check the latest release for more binary download options.

From Source

The following instructions show how to build and install Oura from source code.

Pre-requisites

  • Rust toolchain

Procedure

git clone git@github.com:txpipe/oura.git
cd oura
cargo install --all-features --path .

Docker

Oura provides already built public Docker images through Github Packages. To execute Oura via Docker, use the following command:

docker run ghcr.io/txpipe/oura:latest

The result of the above command should show Oura's command-line help message.

Entry Point

The entry-point of the image points to Oura executable. You can pass the same command-line arguments that you would pass to the binary release running bare-metal. For example:

docker run -it ghcr.io/txpipe/oura:latest \
    watch relays-new.cardano-mainnet.iohk.io:3001 \
    --bearer tcp

For more information on available command-line arguments, check the usage section.

Using a Configuration File

The default daemon configuration file for Oura is located in /etc/oura/daemon.toml. To run Oura in daemon mode with a custom configuration file, you need to mount it in the correct location. The following example runs a docker container in background using a configuration file named daemon.toml located in the current folder:

docker run -d -v $(pwd)/daemon.toml:/etc/oura/daemon.toml \
    ghcr.io/txpipe/oura:latest daemon

Versioned Images

Images are also tagged with the corresponding version number. It is highly recommended to use a fixed image version in production environments to avoid the effects of new features being included in each release (please remember Oura hasn't reached v1 stability guarantees).

To use a versioned image, replace the latest tag by the desired version with the v prefix. For example, to use version 1.0.0, use the following image:

ghcr.io/txpipe/oura:v1.0.0

Kubernetes

Oura running in daemon mode can be installed can be deployed in Kubernetes cluster, Depending on your needs, we recommend any of the following approaches.

Approach #1: Sidecar Container

Oura can be loaded as a sidecar container in the same pod as your Cardano node. Since containers in a pod share the same file-system layer, it's easy to point Oura to the unix-socket of the node.

In the following example yaml, we show a redacted version of a Cardano node resource defined a s Kubernetes STS. Pay attention on the extra container and to the volume mounts to share the unix socket.

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: cardano-node
spec:

  # REDACTED: here goes your normal cardano node sts / deployment spec

  template:
    spec:

      # REDACTED: here goes your normal cardano node pod specs

      containers:
      - name: main

        # REDACTED: here goes your normal cardano node container properties

        # add a new volume mount to enable the socket to be
        # consumed by the 2nd container in the pod (Oura)
        volumeMounts:
        - mountPath: /opt/cardano/cnode/sockets/node0.socket
          name: unix-socket

      # add a 2nd container pointing to the _Oura_ image
      - name: oura
        image: ghcr.io/txpipe/oura:latest

        # we mount the same volume that the main container uses as the source
        # for the Cardano node unix socket.
        volumeMounts:
        - mountPath: /opt/cardano/cnode/sockets/node0.socket
          name: unix-socket
        - mountPath: /etc/oura
          name: oura-config

      volumes:

      # REDACTED: here goes any required volume for you normal cardano node setup

      # empty-dir volume to share the unix socket between containers
      - name: unix-socket
        emptyDir: {}

      # a config map resource with Oura's config particular for your requirements
      - name: oura-config
        configMap:
          name: oura-config

Approach #2: Standalone Deployment

Oura can be implemented as a standalone Kubernetes deployment resource. This is useful if your Cardano node is not part of your Kubernetes cluster or if you want to keep your node strictly isolated from the access of a sidecard pod.

Please note that the amount of replicas is set to 1. Oura doesn't have any kind of "coordination" between instances. Adding more than one replica will just create extra pipelines duplicating the same work.

apiVersion: v1
kind: ConfigMap
metadata:
  name: oura
data:
  daemon.toml: |-
    [source]
    # REDACTED: here goes your `source` configuration options

    [[filters]]
    # REDACTED: here goes your `filters` configuration options

    [sink]
    # REDACTED: here goes your `sink` configuration options
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: oura
  labels:
    app: oura
spec:
  replicas: 1
  selector:
    matchLabels:
      app: oura
  template:
    metadata:
      labels:
        app: oura
    spec:
      containers:
      - name: main
        image: ghcr.io/txpipe/oura:latest
        env:
          - name: "RUST_LOG"
            value: "info"
        resources:
          requests:
            memory: 50Mi
            cpu: 50m
          limits:
            memory: 200Mi
            cpu: 200m
        args:
          - "daemon"
        volumeMounts:
          - mountPath: /etc/oura
            name: config
      volumes:
      - name: config
        configMap:
          name: oura

Usage

Oura provides three different execution modes:

  • Daemon: a fully-configurable pipeline that runs in the background. Sources, filters and sinks can be combined to fulfil particular use-cases.
  • Watch: to watch live block events from a node directly in the terminal. It is meant for humans, it uses colors and throttling to facilitate reading.
  • Dump: to dump live block events from a node into rotation log files or stdout. It uses JSONL format for persistence of the events.

Watch Mode

The watch mode provides a quick way to tail the latest events from the blockchain. It connects directly to a Cardano node using either node-to-client or node-to-node protocols. The output is sent into the terminal in a human-readable fashion.

The output is colorized by type of event and dynamically truncated to fit the width of the terminal. The speed of the output lines is throttled to facilitate visual inspection of each even, otherwise, all events for a block would be output simultaneously.

Usage

To start Oura in watch mode, use the following command from your terminal:

oura watch [OPTIONS] <socket>
  • <socket>: this a required argument that specifies how to connect to the cardano node. It can either be a tcp address (<host>:<port> syntax) or a file path pointing to the location of the unix socket.

Options

  • --bearer <bearer>: an option that specifies the type of bearer to use. Possible values are tcp and unix. If omitted, the value unix is used as default.
  • --magic <magic>: the magic number of the network you're connecting to. Possible values are mainnet, testnet, preview, preprod or a numeric value. If omitted, the value mainnet is used as default.
  • --mode <mode>: an option to force the which set of mini-protocols to use when connecting to the Cardano node. Possible values: node and client. If omitted, Oura will infer the standard protocols for the specified bearer.
  • --since <slot>,<hash>: an option to specify from which point in the chain Oura should start reading from. The point is referenced by passing the slot of the block followed by a comma and the hash of the block (<slot>,<hash>). If omitted, Oura will start reading from the tail (tip) of the node.
  • --wrap: indicates that long output text should break and continue in the following line. If omitted, lines will be truncated to fit in the available terminal width.

Examples

Watch Live Data From A Remote Relay Node

oura watch relays-new.cardano-mainnet.iohk.io:3001 --bearer tcp

Watch Live Data From A Local Node Via Unix Socket

oura watch /opt/cardano/cnode/sockets/node0.socket --bearer unix

Watch Live Data From The Tip Of A Local Testnet Node

oura watch /opt/cardano/cnode/sockets/node0.socket --bearer unix --magic testnet

Watch Data Starting At A Particular Block

oura watch relays-new.cardano-mainnet.iohk.io:3001 \
    --bearer tcp \
    --since 49159253,d034a2d0e4c3076f57368ed59319010c265718f0923057f8ff914a3b6bfd1314

Watch Live Data From the "Preview" testnet

oura watch preview-node.world.dev.cardano.org:30002 --bearer tcp --magic preview

Watch Live Data From the "Pre-Production" testnet

oura watch preprod-node.world.dev.cardano.org:30000 --bearer tcp --magic preprod

Daemon Mode

Oura's daemon mode processes data in the background, without any live output. This mode is used in scenarios where you need to continuously bridge blockchain data with other persistence mechanisms or to trigger an automated process in response to a particular event pattern.

Start Daemon Mode

To start Oura in daemon mode, use the following command:

oura daemon

Available options:

  • --config: path of a custom toml configuration file to use. If not specified, configuration will be loaded from /etc/oura/daemon.toml.
  • --cursor: a custom point in the chain to use as starting point. Expects format: slot,hex-hash. If not specified, it will look for the cursor section available via toml configuration or fallback to the intersect options of the source stage.

Example of starting daemon mode with default config file:

# config will be loaded from /etc/oura/daemon.toml
oura daemon

Example of starting daemon mode with a custom config file at my_config.toml:

oura daemon --config my_config.toml

Example of starting daemon mode specifying a particular cursor:

oura daemon --cursor 56134714,2d2a5503c16671ac7d5296f8e6bfeee050b2c2900a7d8c97b36c434667eb99d9

Configuration

The configuration file needs to specify the source, filters and sink to use in a particular pipeline. The following toml represent the typical skeleton of an Oura config file:

[source]
type = "X" # the type of source to use

# custom config fields for this source type
foo = "abc"
bar = "xyz"

[[filters]]
type = "Y" # the type of filter to use

# custom config fields for this filter type
foo = "123"
bar = "789"

[sink]
# the type of sink to use
type = "Z"

# custom config fields for this sink type
foo = "123"
bar = "789"

# optional cursor settings, remove seaction to disable feature
[cursor]
type = "File"
path = "/var/oura/cursor"

The source section

This section specifies the origin of the data. The special type field must always be present and containing a value matching any of the available built-in sources. The rest of the fields in the section will depend on the selected type. See the sources section for a list of available options and their corresponding config values.

The filters section

This section specifies a collection of filters that are applied in sequence to each event. The special type field must always be present and containing a value matching any of the available built-in filters. Notice that this section of the config is an array, allowing multiple filter sections per config file. See the filters section for a list of available options and their corresponding config values.

The sink section

This section specifies the destination of the data. The special type field must always be present and containing a value matching any of the available built-in sinks. The rest of the fields in the section will depend on the selected type. See the sinks section for a list of available options.

The cursor section

This section specifies how to configure the cursor feature. A cursor is a reference of the current position of the pipeline. If the pipeline needs to restart for whatever reason, and a cursor is available, the pipeline will start reading from that point in the chain. Removing the section from the config will disable the cursor feature.

Full Example

Here's an example configuration file that uses a Node-to-Node source and output the events into a Kafka sink:

[source]
type = "N2N"
address = ["Tcp", "relays-new.cardano-mainnet.iohk.io:3001"]
magic = "mainnet"

[[filters]]
type = "Fingerprint"

[[filters]]
type = "Selection"
predicate = "variant_in"
argument = ["Block", "Transaction"]

[sink]
type = "Kafka"
brokers = ["127.0.0.1:53147"]
topic = "testnet"

Dump Mode

The dump mode provides a quick way to tail the latest events from the blockchain and outputs raw data into stdout or the file system. It connects directly to a Cardano node using either node-to-client or node-to-node protocols. The output is formatted using JSONL (json, one-line per event). This command is intended mainly as quick persistence mechanism of blockchain data, such as keeping a log of blocks / transactions. It can also be used for "piping" stdout into other shell commands.

If an output path is specified, data will be saved as a set of rotation logs files. Each log file will contain a max of ~50mb. A total of 200 files will be stored before starting to delete the older ones. Gzip is used to compress old files as they are rotated.

For real-time, human-inspection of the events, use the watch command.

Usage

To start Oura in dump mode, use the following command from your shell:

oura dump [OPTIONS] <socket>
  • <socket>: this a required argument that specifies how to connect to the cardano node. It can either be a tcp address (<host>:<port> syntax) or a file path pointing to the location of the unix socket.

Options

  • --bearer <bearer>: an option that specifies the type of bearer to use. Possible values are tcp and unix. If omitted, the value unix is used as default.
  • --magic <magic>: the magic number of the network you're connecting to. Possible values are mainnet, testnet or a numeric value. If omitted, the value mainnet is used as default.
  • --mode <mode>: an option to force the which set of mini-protocols to use when connecting to the Cardano node. Possible values: node and client. If omitted, Oura will infer the standard protocols for the specified bearer.
  • --since <slot>,<hash>: an option to specify from which point in the chain Oura should start reading from. The point is referenced by passing the slot of the block followed by a comma and the hash of the block (<slot>,<hash>). If omitted, Oura will start reading from the tail (tip) of the node.
  • --output <path-like>: an option to specify an output file prefix for storing the log files. Logs are rotated, so a timestamp will be added as a suffix to the final filename. If omitted, data will be sent to stdout.

Examples

Dump Data From A Remote Relay Node into Stdout

oura dump relays-new.cardano-mainnet.iohk.io:3001 --bearer tcp

Dump Data From A Remote Relay Node into Rotating Files

oura dump relays-new.cardano-mainnet.iohk.io:3001 --bearer tcp --output ./mainnet-logs

Pipe Data From A Remote Relay Node into a new Shell Command

oura dump relays-new.cardano-mainnet.iohk.io:3001 --bearer tcp | grep block

Library

Coming Soon!

Filters

Filters are intermediate steps in the pipeline that process events as they travel from sources to sinks. They might serve different pourposes, such as: selecting relevant events, enriching event data, transforming data representation, etc.

Built-in Filters

These are the existing filters that are included as part the main Oura codebase:

  • Fingerprint: a filter that adds a (probably) unique identifier (fingerprint) to each event.
  • Selection: a filter that evaluates different built-in predicates to decide which events should go through the pipeline.

New filters are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our Github Repo.

Fingerprint Filter

A filter that computes a (probably) unique identifier for each event and appends it as part of the data passed forward.

Dealing with duplicated records is a common problem in data pipelines. A common workaround is to use identifiers based on the hash of the data for each record. In this way, a duplicated record would yield the same hash, allowing the storage engine to discard the extra instance.

The fingerprint filter uses the non-cryptographic hash algorithm murmur3 to compute an id for each Oura event with a very low collision level. We use a non-cryptographic hash because they are faster to compute and non of the cryptographic properties are required for this use-case.

When enabled, this filter will set the fingerprint property of the Event data structure passed through each stage of the pipeline. Sinks at the end of the process might leverage this value as primary key of the corresponding storage mechanism.

Configuration

Adding the following section to the daemon config file will enable the filter as part of the pipeline:

[[filters]]
type = "Fingerprint"

Section: filter

  • type: the literal value Fingerprint.

Selection Filter

A filter that evaluates a set of configurable predicates against each event in the pipeline to decide which records should be sent to the following stage.

Not every use-case requires each and every event to be processed. For example, a pipeline interested in creating a 'metadata' search engine might not care about transaction outputs. With a similar logic, a pipeline aggregating transaction amounts might not care about metadata. The selection filter provides a way to optimize the pipeline so that only relevant events are processed.

The filter works by evaluating a predicate against each event. If the predicate returns true, then the event will continue down the pipeline. If the predicate evalutes to false, the event will be dopped. We currently provide some common built-in predicate to facilitate common use-cases (eg: matching event type, matching policy id, matching a metadata key, etc.). We also provide some 'connecting' predicates like all_of, any_of, and not which can be used to create complex conditions by composing other predicates.

Configuration

Adding the following section to the daemon config file will enable the filter as part of the pipeline:

[[filters]]
type = "Selection"

[filters.check]
predicate = "<predicate kind>"
argument = <predicate argument>

Section: filters

  • type: the literal value Selection.

Section filters.check

  • predicate: the key of the predicate to use for the evaluation. See the list of available predicates for possible values.
  • argument: a polimorphic argument that specializes the behavior of the predicate in some way.

Available Predicates

  • variant_in (string[]): This predicate will yield true when the variant of the event matches any of the items in the argument array.
  • variant_not_in (string[]): This predicate will yield true when the variant of the event doesn't match any of the items in the argument array.
  • policy_equals (string): This predicate will yield true when the policy of a mint or output asset matches the value in the argument.
  • asset_equals (string): This predicate will yield true when the policy of a mint or output asset matches the value in the argument.
  • metadata_label_equals (string): This predicate will yield true when the root label of a metadata entry matches the value in the argument.
  • metadata_any_sub_label_equals (string): This predicate will yield true when at least one of the sub labels in a metadata entry matches the value in the argument.
  • not (predicate): This predicate will yield true when the predicate in the arguments yields false.
  • any_of (predicate[]): This predicate will yield true when any of the predicates in the argument yields true.
  • all_of (predicate[]): This predicate will yield true when all of the predicates in the argument yields true.

Examples

Allowing only block and transaction events to pass:

[[filters]]
type = "Selection"

[filters.check]
predicate = "variant_in"
argument = ["Block", "Transaction"]

Using the not predicate to allow all events except the variant Transaction:

[[filters]]
type = "Selection"

[filters.check]
predicate = "not"

[filters.check.argument]
predicate = "variant_in"
argument = ["Transaction"]

Using the any_of predicate to filter events presenting any of two different policies (Boolean "or"):

[filters.check]
predicate = "any_of"

[[filters.check.argument]]
predicate = "policy_equals"
argument = "4bf184e01e0f163296ab253edd60774e2d34367d0e7b6cbc689b567d"

[[filters.check.argument]]
predicate = "policy_equals"
argument = "a5bb0e5bb275a573d744a021f9b3bff73595468e002755b447e01559"

Using the all_of predicate to filter only "asset" events presenting a particular policy (Boolean "and") :

[filters.check]
predicate = "all_of"

[[filters.check.argument]]
predicate = "variant_in"
argument = ["OutputAsset"]

[[filters.check.argument]]
predicate = "policy_equals"
argument = "a5bb0e5bb275a573d744a021f9b3bff73595468e002755b447e01559"

Sources

Sources represent the "origin" of the events processed by Oura. Any compatible source is responsible for feeding blockchain data into Oura's pipeline for further processing.

Built-in Sources

These are the currently available sources included as part the main Oura codebase:

  • N2N: an Ouroboros agent that connects to a Cardano node using node-2-node protocols
  • N2C: an Ouroboros agent that connects to a Cardano node using node-2-client protocols

New source are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our Github Repo.

Node-to-Node

The Node-to-Node (N2N) source uses Ouroboros mini-protocols to connect to a local or remote Cardano node through a tcp socket bearer and fetches block data using the ChainSync mini-protocol instantiated to "headers only" and the BlockFetch mini-protocol for retrieval of the actual block payload.

Configuration

The following snippet shows an example of how to set up a typical N2N source:

[source]
type = "N2N"
address = ["Tcp", "<hostname:port>"]
magic = <network magic>

[source.intersect]
type = <intersect strategy>
value = <intersect argument>

[source.mapper]
include_block_end_events = <bool>
include_transaction_details = <bool>
include_transaction_end_events = <bool>
include_block_cbor = <bool>

Section source:

  • type: this field must be set to the literal value N2N
  • address: a tuple describing the location of the tcp endpoint It must be specified as a string with hostname and port number.
  • magic: the magic of the network that the node is running (mainnet, testnet or a custom numeric value)
  • since: (deprecated, please use intersect) the point in the chain where reading of events should start from. It must be specified as a tuple of slot (integer) and block hash (hex string)

Section source.intersect

This section provides advanced options for instructing Oura from which point in the chain to start reading from. Read the intersect options documentation for detailed information.

Section source.mapper

This section provides a way to opt-in into advances behaviour of how the raw chain data is mapped into Oura events. Read the mapper options documentation for detailed information.

Examples

Connecting to a remote Cardano node in mainnet through tcp sockets:

[source]
type = "N2N"
address = ["Tcp", "relays-new.cardano-mainnet.iohk.io:3001"]
magic = "mainnet"

Connecting to a remote Cardano node in testnet through tcp sockets:

[source]
type = "N2N"
address = ["Tcp", "relays-new.cardano-mainnet.iohk.io:3001"]
magic = "testnet"

Start reading from a particular point in the chain:

[source]
type = "N2C"
address = ["Tcp", "relays-new.cardano-mainnet.iohk.io:3001"]
magic = "mainnet"

[source.intersect]
type = "Point"
value = [48896539, "5d1f1b6149b9e80e0ff44f442e0cab0b36437bb92eacf987384be479d4282357"]

Include all details inside the transaction events:

[source]
type = "N2N"
address = ["Tcp", "relays-new.cardano-mainnet.iohk.io:3001"]
magic = "mainnet"


[source.mapper]
include_transaction_details = true
include_block_cbor = true

Node-to-Client

The Node-to-Client (N2C) source uses Ouroboros mini-protocols to connect to a local Cardano node through a unix socket bearer and fetches block data using the ChainSync mini-protocol instantiated to "full blocks".

Configuration

The following snippet shows an example of how to set up a typical N2C source:

[source]
type = "N2C"
address = ["Unix", "<socket location>"]
magic = <network magic>

[source.intersect]
type = <intersect strategy>
value = <intersect argument>

[source.mapper]
include_block_end_events = <bool>
include_transaction_details = <bool>
include_transaction_end_events = <bool>
include_block_cbor = <bool>

Section source:

  • type: this field must be set to the literal value N2C
  • address: a tuple describing the location of the socket
  • magic: the magic of the network that the node is running (mainnet, testnet or a custom numeric value)
  • since: (deprecated, please use intersect) the point in the chain where reading of events should start from. It must be specified as a tuple of slot (integer) and block hash (hex string)

Section source.intersect

This section provides advanced options for instructing Oura from which point in the chain to start reading from. Read the intersect options documentation for detailed information.

Section source.mapper

This section provides a way to opt-in into advances behaviour of how the raw chain data is mapped into Oura events. Read the mapper options documentation for detailed information.

Examples

Connecting to a local Cardano node in mainnet through unix sockets:

[source]
type = "N2C"
address = ["Unix", "/opt/cardano/cnode/sockets/node0.socket"]
magic = "mainnet"

Connecting to a local Cardano node in testnet through unix sockets:

[source]
type = "N2C"
address = ["Unix", "/opt/cardano/cnode/sockets/node0.socket"]
magic = "testnet"

Start reading from a particular point in the chain:

[source]
type = "N2C"
address = ["Unix", "/opt/cardano/cnode/sockets/node0.socket"]
magic = "mainnet"

[source.intersect]
type = "Point"
value = [48896539, "5d1f1b6149b9e80e0ff44f442e0cab0b36437bb92eacf987384be479d4282357"]

Include all details inside the transaction events:

[source]
type = "N2C"
address = ["Unix", "/opt/cardano/cnode/sockets/node0.socket"]
magic = "mainnet"


[source.mapper]
include_transaction_details = true
include_block_cbor = true

Sinks

Sinks are the "destination" of the events processed by Oura. They are the concrete link between the internal representation of the data records and the corresponding external service interested in the data. Typical sinks include: database engines, stream-processing engines, web API calls and FaaS solutions.

Built-in Sinks

These are the existing sinks that are included as part the main Oura codebase:

  • Terminal: a sink that outputs events into stdout with fancy coloring
  • Kakfa: a sink that sends each event into a Kafka topic
  • Elasticsearch: a sink that writes events into an Elasticsearch index or data stream.
  • Webhook: a sink that outputs each event as an HTTP call to a remote endpoint.
  • Logs: a sink that saves events to the file system using JSONL text files.
  • AWS SQS: a sink that sends each event as message to an AWS SQS queue.
  • AWS Lamda: a sink that invokes an AWS Lambda function for each event.
  • AWS S3: a sink that saves the CBOR content of the blocks as an AWS S3 object.
  • Redis Streams: a sink that sends each event into a Redis stream.

New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our Github Repo.

Terminal

A sink that outputs each event into the terminal through stdout using fancy coloring 💅.

Configuration

[sink]
type = "Terminal"
throttle_min_span_millis = 500
wrap = true
  • type (required): the literal value Terminal.
  • throttle_min_span_millis (optional, default = 500): the amount of time (milliseconds) to wait between printing each event into the console. This is used to facilitate the reading for human following the output.
  • wrap (optional, default = false): a true value indicates that long output text should break and continue in the following line. If false, lines will be truncated to fit in the available terminal width.

Kafka

A sink that implements a Kafka producer. Each event is json-encoded and sent as a message to a single Kafka topic.

Configuration

[sink]
type = "Kafka"
brokers = ["kafka-broker-0:9092"]
topic = "cardano-events"
  • type: the literal value Kafka.
  • brokers: indicates the location of the Kafka brokers within the network. Several hostname:port pairs can be added to the array for a "cluster" scenario.
  • topic this field indicates which Kafka topic to use to send the outbound messages.

Elasticsearch

A sink that outputs events into an Elasticsearch server. Each event is json-encoded and sent as a message to an index or data stream.

Configuration

[sink]
type = "Elastic"
url = "https://localhost:9200"
index = "oura.sink.v0.mainnet"

[sink.credentials]
type = "Basic"
username = "oura123"
password = "my very secret stuff"

Section: sink

  • type: the literal value Elastic.
  • url: the location of the Elasticsearch's API
  • index: the name of the index (or data stream) to store the event documents
  • idempotency (optional): flag that if enabled will force idempotent calls to ES (to avoid duplicates)
  • retry_policy (optional): controls the policy to retry failed requests (see retry policy)

Section: sink.credentials

This section configures the auth mechanism against Elasticsearch. You can remove the whole section from the configuration if you want to skip authentication altogether (maybe private cluster without auth?).

We currently only implement basic auth, other mechanisms will be implemented at some point.

  • type: the mechanism to use for authentication, only Basic is currently implemented
  • username: username of the user with access to Elasticsearch
  • password: password of the user with access to Elasticsearch

Idempotency

In services and API calls, idempotency refers to a property of the system where the execution of multiple "equivalent" requests have the same effect as a single request. In other words, "idempotent" calls can be triggered multiple times without problem.

In our Elasticsearch sink, when the idempotency flag is enabled, each document sent to the index will specify a particular content-based ID: the fingerprint of the event. If Oura restarts without having a cursor or if the same block is processed for any reason, repeated events will present the same ID and Elasticsearch will reject them and Oura will continue with the following event. This mechanism provides a strong guarantee that our index won't contain duplicate data.

If the flag is disabled, each document will be generated using a random ID, ensuring that it will be indexed regardless.

Webhook

A sink that outputs each event as an HTTP call to a remote endpoint. Each event is json-encoded and sent as the body of a request using POST method.

The sink expect a 200 reponse code for each HTTP call. If found, the process will continue with the next message. If an error occurs (either at the tcp or http level), the sink will apply the corresponding retry logic as specified in the configuration.

Configuration

[sink]
type = "Webhook"
url = "https://endpoint:5000/events"
authorization = "user:pass"
timeout = 30000
error_policy = "Continue"

[sink.retry_policy]
max_retries = 30
backoff_unit =  5000
backoff_factor = 2
max_backoff = 100000

[sink.headers]
extra_header_1 = "abc"
extra_header_2 = "123"

Section: sink

  • type: the literal value Webhook.
  • url: url of your remote endpoint (needs to accept POST method)
  • authorization (optional): value to add as the 'Authorization' HTTP header
  • headers (optional): key-value map of extra headers to pass in each HTTP call
  • allow_invalid_certs (optional): a flag to skip TLS cert validation (usually for self-signed certs).
  • timeout (optional): the timeout value for the HTTP response in milliseconds. Default value is 30000.
  • error_policy (optional): either Continue or Exit. Default value is Exit.
  • retry_policy (optional): controls the policy to retry failed requests (see retry policy)

Logs

A sink that saves events into the file system. Each event is json-encoded and appended to the of a text file. Files are rotated once they reach a certain size. Optionally, old files can be automatically compressed once they have rotated.

Configuration

Example sink section config

[sink]
type = "Logs"
output_path = "/var/oura/mainnet"
output_format = "JSONL"
max_bytes_per_file = 1_000_000
max_total_files = 10
compress_files = true

Section: sink

  • type: the literal value Logs.
  • output_path: the path-like prefix for the output log files
  • output_format (optional): specified the type of syntax to use for the serialization of the events. Only available option at the moment is JSONL (json + line break)
  • max_bytes_per_file (optional): the max amount of bytes to add in a file before rotating it
  • max_total_files (optional): the max amount of files to keep in the file system before start deleting the old ones
  • compress_files (optional): a boolean indicating if the rotated files should be compressed.

AWS SQS

A sink that sends each event as a message to an AWS SQS queue. Each event is json-encoded and sent to a configurable SQS queue using AWS API endpoints.

The sink will process each incoming event in sequence and submit the corresponding SendMessage request to the SQS API. Once the queue acknowledges reception of the message, the sink will advance and continue with the following event.

The sink support both FIFO and Standard queues. The sink configuration will determine which logic to apply. In case of FIFO, the group id is determined by an explicit configuration value and the message id is defined by the fingerprint value of the event (Fingerprint filter needs to be enabled).

The sink uses AWS SDK's built-in retry logic which can be configured at the sink level. Authentication against AWS is built-in in the SDK library and follows the common chain of providers (env vars, ~/.aws, etc).

Configuration

[sink]
type = "AwsSqs"
region = "us-west-2"
queue_url = "https://sqs.us-west-2.amazonaws.com/xxxxxx/my-queue"
fifo = true
group_id = "my_group"
max_retries = 5

Section: sink

  • type: the literal value AwsSqs.
  • region: The AWS region where the queue is located.
  • queue_url: The SQS queue URL provided by AWS (not to be confused with the ARN).
  • fifo: A flag to determine if the queue is of type FIFO.
  • group_id: A fixed group id to be used when sending messages to a FIFO queue.
  • max_retries: The max number of send retries before exiting the pipeline with an error.

AWS Credentials

The sink needs valid AWS credentials to interact with the cloud service. The majority of the SDKs and libraries that interact with AWS follow the same approach to access these credentials from a chain of possible providers:

  • Credentials stored as the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  • A Web Identity Token credentials from the environment or container (including EKS)
  • ECS credentials (IAM roles for tasks)
  • As entries in the credentials file in the .aws directory in your home directory (~/.aws/)
  • From the EC2 Instance Metadata Service (IAM Roles attached to an instance)

Oura, by mean of the Rust AWS SDK lib, will honor the above chain of providers. Use any of the above that fits your particular scenario. Please refer to AWS' documentation for more detail.

FIFO vs Standard Queues

Oura processes messages maintaining the sequence of the blocks and respecting the order of the transactions within the block. An AWS SQS FIFO queue provides the consumer with a guarantee that the order in which messages are consumed matches the order in which they were sent.

Connecting Oura with a FIFO queue would provide the consumer with the guarantee that the events received follow the same order as they appeared in the blockchain. This might be useful, for example, in scenarios where the processing of an event requires a reference of a previous state of the chain.

Please note that rollback events might happen upstream, at the blockchain level, which need to be handled by the consumer to unwind any side-effects of the processing of the newly orphaned blocks. This problem can be mitigated by using Oura's rollback buffer feature.

If each event can be processed in isolation, if the process is idempotent or if the order doesn't affect the outcome, the recommended approach is to use a Standard queue which provides "at least once" processing guarantees, relaxing the constraints and improving the overall performance.

Payload Size Limitation

AWS SQS service has a 256kb payload size limit. This is more than enough for individual events, but it might be too little for pipelines where the include_cbor_hex option is enabled. If your goal of your pipeline is to access the raw CBOR content, we recommend taking a look at the AWS S3 Sink that provides a direct way for storing CBOR block in an S3 bucket.

AWS Lambda

A sink that invokes an AWS Lambda function for each received event. Each event is json-encoded and sent to a configurable function using AWS API endpoints.

The sink will process each incoming event in sequence, invoke the specified function and wait for the response.

A retry mechanism is available for failures to dispatch the call, but not for failures within the execution of the function. Regardless of the success or not of the function, the sink will advance and continue with the following event.

Authentication against AWS is built-in in the SDK library and follows the common chain of providers (env vars, ~/.aws, etc).

Configuration

[sink]
type = "AwsLambda"
region = "us-west-2"
function_name = "arn:aws:lambda:us-west-2:xxxxx:function:my-func"
max_retries = 5

Section: sink

  • type: the literal value AwsLambda.
  • region: The AWS region where the function is located.
  • function_name: The ARN of the function we wish to invoke.
  • max_retries: The max number of send retries before exiting the pipeline with an error.

AWS Credentials

The sink needs valid AWS credentials to interact with the cloud service. The majority of the SDKs and libraries that interact with AWS follow the same approach to access these credentials from a chain of possible providers:

  • Credentials stored as the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  • A Web Identity Token credentials from the environment or container (including EKS)
  • ECS credentials (IAM roles for tasks)
  • As entries in the credentials file in the .aws directory in your home directory (~/.aws/)
  • From the EC2 Instance Metadata Service (IAM Roles attached to an instance)

Oura, by mean of the Rust AWS SDK lib, will honor the above chain of providers. Use any of the above that fits your particular scenario. Please refer to AWS' documentation for more detail.

AWS S3

A sink that saves the CBOR content of the blocks as S3 object.

The sink will process each incoming event in sequence and select only the events of type Block. The CBOR content of the block will be extracted and saves as an S3 object in a configurable bucket in either hex or binary encoding.

A configurable option allows the user to decide how to name the object using values from the block header (such as epoch, slot, hash, etc). The properties of the block will be saved as metadata of the S3 Object for later identification (eg: block number, hash, slot, etc).

The sink uses AWS SDK's built-in retry logic which can be configured at the sink level. Authentication against AWS is built-in in the SDK library and follows the common chain of providers (env vars, ~/.aws, etc).

Configuration

[sink]
type = "AwsS3"
region = "us-west-2"
bucket = "my-bucket"
prefix = "mainnet/"
naming = "SlotHash"
content = "Cbor"
max_retries = 5

Section: sink

  • type: the literal value AwsS3.
  • function_name: The ARN of the function we wish to invoke.
  • region: The AWS region where the bucket is located.
  • bucket: The name of the bucket to store the blocks.
  • prefix: A prefix to prepend on each object's key.
  • naming: One of the available naming conventions (see section below)
  • content: Either Cbor for binary encoding or CborHex for plain text hex representation of the CBOR
  • max_retries: The max number of send retries before exiting the pipeline with an error.

IMPORTANT: For this sink to work correctly, the include_block_cbor option should be enabled in the source sink configuration (see mapper options).

Naming Convention

S3 Buckets allow the user to query by object prefix. It's important to use a naming convention that is compatible with the types of filters that the consumer intends to use. This sink provides the following options:

  • Hash: formats the key using "{hash}"
  • SlotHash: formats the key using "{slot}.{hash}"
  • BlockHash: formats the key using "{block_num}.{hash}"
  • EpochHash: formats the key using "{epoch}.{hash}"
  • EpochSlotHash: formats the key using "{epoch}.{slot}.{hash}"
  • EpochBlockHash: formats the key using "{epoch}.{block_num}.{hash}"

Content Encoding

The sink provides two options for encoding the content of the object:

  • Cbor: the S3 object will contain the raw, unmodified CBOR value in binary format. The content type of the object in this case will be "application/cbor".
  • CborHex: the S3 object will contain the CBOR payload of the block encoded as a hex string. The content type of the object in this case will be "text/plain".

Metadata

The sink uses the data from the block event to populate metadata fields of the S3 object for easier identification of the block once persisted:

  • era
  • issuer_vkey
  • tx_count
  • slot
  • hash
  • number
  • previous_hash

Please note that S3 doesn't allow filtering by metadata. For efficient filter, the only option available is to use the prefix of the key.

AWS Credentials

The sink needs valid AWS credentials to interact with the cloud service. The majority of the SDKs and libraries that interact with AWS follow the same approach to access these credentials from a chain of possible providers:

  • Credentials stored as the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  • A Web Identity Token credentials from the environment or container (including EKS)
  • ECS credentials (IAM roles for tasks)
  • As entries in the credentials file in the .aws directory in your home directory (~/.aws/)
  • From the EC2 Instance Metadata Service (IAM Roles attached to an instance)

Oura, by mean of the Rust AWS SDK lib, will honor the above chain of providers. Use any of the above that fits your particular scenario. Please refer to AWS' documentation for more detail.

Google Cloud PubSub

A sink that sends each event as a message to a PubSub topic. Each event is json-encoded and sent to a configurable PubSub topic.

Configuration

[sink]
type = "GcpPubSub"
topic = "test"

[sink.retry_policy]
max_retries = 30
backoff_unit =  5000
backoff_factor = 2
max_backoff = 100000

Section: sink

  • type: the literal value GcpPubSub.
  • topic: the short name of the topic to send message to.
  • error_policy (optional): either Continue or Exit. Default value is Exit.
  • retry_policy

GCP Authentication

The GCP authentication process relies on the following conventions:

  • If the GOOGLE_APPLICATION_CREDENTIALS environmental variable is specified, the value will be used as the file path to retrieve the JSON file with the credentials.
  • If the server is running on GCP, the credentials will be retrieved from the metadata server.

Redis Streams

A sink that outputs events into Redis Stream.

Redis Streams works as an append-only log where multiple consumers can read from the same queue while keeping independent offsets (as opposed to a PubSub topic where one subscriber affect the other). You can learn more about the Streams feature in the official Redis Documentation.

This sink will process incoming events and send a JSON-encoded message of the payload for each one using the XADD command. The Redis instance can be local or remote.

Configuration

Example configuration that sends all events into a single stream named mystream of a Redis instance running in port 6379 of the localhost.

[sink]
type = "Redis"
redis_server = "redis://localhost:6379"
stream_name = "mystream"

Example configuration that sends events into different streams (named by the type of event) of a Redis instance running in port 6379 of the localhost.

[sink]
type = "Redis"
redis_server = "redis://localhost:6379"
stream_strategy = "ByEventType"

Section: sink

  • type: the literal value Redis.
  • redis_server: the redis server in the format redis://[<username>][:<password>]@<hostname>[:port][/<db>]
  • stream_name : the name of the redis stream for StreamStrategy None, default is "oura" if not specified
  • stream_strategy : None or ByEventType

Conventions

It is possible to send all Event to a single stream or create multiple streams, one for each event type. By appling the selection filter it is possible to define the streams which should be created.

The sink uses the default Redis convention to define the unique entry ID for each message sent to the stream ( <millisecondsTime>-<sequenceNumber>).

Messages in Redis Streams are required to be hashes (maps between the string fields and the string values). This sink will serialize the event into a single-entry map with the following parts:

  • key: the fingerprint value if available, or the event type name.
  • value: the json-encoded payload of the event.

Reference

Data Dictionary

Oura follows a Cardano chain and outputs events. Each event contains data about itself and about the context in which it occurred.

A consumer aggregating a sequence of multiple events will notice redundant / duplicated data. For example, the "block number" value will appear repeated in the context of every event of the same block. This behavior is intended, making each event a self-contained record is an architectural decision. We favor "consumption ergonomics" over "data normalization".

Available Events

The following list represent the already implemented events. These data structures are represented as an enum at the code level.

RollBack Event

Data on chain rollback(The result of the local node switching to the consensus chains).

NameDataTypeDescription
block_slotu64Slot of the rolled back block.
block_hashStringHash of the rolled back block.



Block Event

Data on an issued block.

NameDataTypeDescription
body_sizeusizeSize of the block.
issuer_vkeyStringBlock issuer Public verification key.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.



Transaction Event

Data on a transaction.

NameDataTypeDescription
feeu64Transaction fees in lovelace.
ttlOption<u64>Transaction time to live.
validity_interval_startOption<u64>Start of transaction validity interval
network_idOption<u32>Network ID.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



TxInput Event

Data on a transaction input.

NameDataTypeDescription
tx_idStringTransaction ID.
indexu64Index of input in transaction inputs.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.
input_idxOption<usize>Input ID.



TxOutput Event

Data on a transaction output (UTXO).

NameDataTypeDescription
addressStringAddress of UTXO.
amountu64Amount of lovelace in UTXO.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.
output_idxOption<usize>Output ID.



OutputAsset Event

Data on a non-ADA asset in a UTXO.

NameDataTypeDescription
policyStringMinting policy of asset.
assetStringAsset ID.
amountu64Amount of asset.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.
output_idxOption<usize>Output ID.



Metadata Event

NameDataTypeDescription
keyString....
subkeyOption<String>....
valueOption<String>....

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



Mint Event

Data on the minting of a non-ADA asset.

NameDataTypeDescription
policyStringMinting policy of asset.
assetStringAsset ID.
quantityi64Quantity of asset minted.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



Collateral Event

Data on collateral inputs.

NameDataTypeDescription
tx_idStringTransaction ID.
indexu64Index of transaction input in inputs.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



PlutusScriptRef Event

NameDataTypeDescription
dataString....

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



StakeRegistration Event

Data on stake registration event.

NameDataTypeDescription
credentialStakeCredentialStaking credentials.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



StakeDeregistration Event

Data on stake deregistration event.

NameDataTypeDescription
credentialStakeCredentialStaking credentials.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



StakeDelegation Event

Data on stake delegation event.

NameDataTypeDescription
credentialStakeCredentialStake credentials.
pool_hashStringHash of stake pool ID.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



PoolRegistration Event

Data on the stake registration event.

NameDataTypeDescription
operatorStringStake pool operator ID.
vrf_keyhashStringKehash of node VRF operational key.
pledgeu64Stake pool pledge (lovelace).
costu64Operational costs per epoch (lovelace).
marginf64Operator margin.
reward_accountStringAccount to receive stake pool rewards.
pool_ownersVec<String>Stake pool owners.
relaysVec<String>....
pool_metadataOption<String>....

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



PoolRetirement Event

Data on stake pool retirement event.

NameDataTypeDescription
poolStringPool ID.
epochu64Current epoch.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



GenesisKeyDelegation Event

Data on genesis key delegation.

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Current slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.



MoveInstantaneousRewardsCert Event

NameDataTypeDescription
from_reservesbool....
from_treasurybool....
to_stake_credentialsOption<BTreeMap<StakeCredential, i64>>....
to_other_potOption<u64>....

Context

NameDataTypeDescription
block_numberOption<u64>Height of block from genesis.
slotOption<u64>Blockchain slot.
tx_idxOption<usize>Transaction Index.
tx_hashOption<String>Transaction hash.

Advanced Features

This section provides detailed information on the some of the advanced features available in Oura:

  • Stateful Cursor: provides a mechanism to persist the "position" of the processing pipeline to make it resilient to restarts.
  • Rollback Buffer: provides a way to mitigate the impact of chain rollbacks in downstream stages.
  • Pipeline Metrics: allows operators to track the progress and performance of long-running Oura sessions.
  • Mapper Options: A set of "expensive" event mapping procedures that require an explicit opt-in to be activated.
  • Intersect Options: Advanced options for instructing Oura from which point in the chain to start reading from.
  • Custom Network: Instructions on how to configure Oura for connecting to a custom network.
  • Retry Policy: Instructions on how to configure retry policies for different operations

Stateful Cursor

The cursor feature provides a mechanism to persist the "position" of the processing pipeline to make it resilient to restarts.

Context

When running in daemon mode, a restart of the process will make Oura start crawling from the initially configured point. This default behavior can be problematic depending on the use-case of the operator.

An scenario where continuous "coverage" of all the processed blocks is important (eg: building a stateful view of the chain), it's prohibitive to have "gaps" while procesing. If Oura is configure to start from the "tip" of the chain, a restart of the process might miss some blocks during the bootstrap procedure.

A valid workaround to the above problem is to configure Oura to start from a fixed point in the chain. If a restart occurs, the pipeline will re-process the blocks, ensuring that each block was processed at least once. When working with sinks that implement idempotency when processing an event, receiving data from the same block multiple times should not impose a problem.

Although valid, the workaround described above is very inefficient. If the fixed point at which the pipeline starts is too far behind, catching up could take several hours, wasting time and resource.

Feature

Oura implements an optional stateful cursor that receives notifications from the sink stage of the pipeline to continuously track the current position of the chain. At certain checkpoints (every 10 secs by default), the position is persisted onto the file system at a configurable location.

Assuming that a restart occurs and the cursor feature is enabled, the process will attempt to locate and load the persisted value and instruct the source stage to begin reading chain data from the last known position.

Configuration

The cursor feature is a configurable setting available when running in daemon mode. A top level [cursor] section of the daemon toml file controls the feature:

[cursor]
type = "File"
path = "/var/oura/cursor"
  • [cursor]: The presence of this section in the toml file indicates Oura to enable the cursor feature.
  • type: The type of persistence backend to use for storing the state of the cursor. The only available option at the moment is File, which stores the cursor in the file system.
  • path: The location of the cursor file within the file system. Default value is var/oura/cursor.

Rollback Buffer

The "rollback buffer" feature provides a way to mitigate the impact of chain rollbacks in downstream stages of the data-processing pipeline.

Context

Handling rollbacks in a persistent storage requires clearing the orphaned data / blocks before adding new records. The complexity of this process may vary by concrete storage engine, but it always has an associated impact on performance. In some scenarios, it might even be prohibitive to process events without a reasonable level of confidence about the immutability of the record.

Rollbacks occur frequently under normal conditions, but the chances of a block becoming orphaned diminishes as the depth of the block increases. Some Oura use-cases may benefit from this property, some pipelines might prefer lesser rollback events, even if it means waiting for a certain number of confirmations.

Feature

Oura provides a "rollback buffer" that will hold blocks in memory until they reach a certain depth. Only blocks above a min depth threshold will be sent down the pipeline. If a rollback occurs and the intersection is within the scope of the buffer, the rollback operation will occur within memory, totally transparent to the subsequent stages of the pipeline.

If a rollback occurs and the intersection is outside of the scope of the buffer, Oura will fallback to the original behaviour and publish a RollbackEvent so that the "sink" stages may handle the rollback procedure manually.

Trade-off

There's an obvious trade-off to this approach: latency. A pipeline will not process any events until the buffer fills up. Once the initial wait is over, the throughput of the whole pipeline should be equivalent to having no buffer at all (due to Oura's "pipelining" nature). If a rollback occurs, an extra delay will be required to fill the buffer again.

Notice that even if the throughput isn't affected, the latency (measured as the delta between the timestamp at which the event reaches the "sink" stage and the original timestamp of the block) will always be affected by a fixed value proportional to the size of the buffer.

Implementation Details

The buffer logic is implemented in pallas-miniprotocols library. It works by keeping a VecDeque data structure of chain "points", where roll-forward operations accumulate at the end of the deque and retrieving confirmed points means popping from the front of the deque.

Configuration

The min depth is a configurable setting available when running in daemon mode. Higher min_depth values will lower the chances of experiencing a rollback event, at the cost of adding more latency. An node-to-node source stage config would look like this:

[source]
type = "N2N"
address = ["Tcp", "relays-new.cardano-mainnet.iohk.io:3001"]
magic = "mainnet"
min_depth = 6

Node-to-client sources provide an equivalent setting.

Pipeline Metrics

The metrics features allows operators to track the progress and performance of long-running Oura sessions.

Context

Some use-cases require Oura to be running either continuosuly or for prolonged periods of time. Keeping a sink updated in real-time requires 24/7 operation. Dumping historical chain-data from origin might take several hours.

These scenarios require an understanding of the internal state of the pipeline to facilitate monitoring and troubleshooting of the system. In a data-processing pipeline such as this, two important aspects need to observable: progress and performance.

Feature

Oura provides an optional /metrics HTTP endpoint that uses Prometheus format to expose real-time opertional metrics of the pipeline. Each stage (source / sink) is responsible for notifying their progress as they process each event. This notifications are then aggregated via counters & gauges and exposed via HTTP using the well-known Prometheus encoding.

The following metrics are available:

  • chain_tip: the last detected tip of the chain (height)
  • rollback_count: number of rollback events occurred
  • source_current_slot: last slot processed by the source of the pipeline
  • source_current_height: last height (block #) processed by the source of the pipeline
  • source_event_count: number of events processed by the source of the pipeline
  • sink_current_slot: last slot processed by the sink of the pipeline
  • sink_event_count: number of events processed by the sink of the pipeline

Configuration

The metrics feature is a configurable setting available when running in daemon mode. A top level [metrics] section of the daemon toml file controls the feature:

# daemon.toml file

[metrics]
address = "0.0.0.0:9186"
endpoint = "/metrics"
  • [metrics] section needs to be present to enable the feature. Absence of the section will not expose any HTTP endpoints.
  • address: The address at which the HTTP server will be listening for request. Expected format is <ip>:<port>. Use the IP value 0.0.0.0 to allow connections on any of the available IP address of the network interface. Default value is 0.0.0.0:9186.
  • endpoint: The path at which the metrics will be exposed. Default value is /metrics.

Usage

Once enabled, a quick method to check the metrics output is to navigate to the HTTP endpoint using any common browser. A local instance of Oura with metrics enabled on port 9186 can be accessed by opening the URL http://localhost:9186

An output similar to the following should be shown by the browser:

# HELP chain_tip the last detected tip of the chain (height)
# TYPE chain_tip gauge
chain_tip 6935733
# HELP rollback_count number of rollback events occurred
# TYPE rollback_count counter
rollback_count 1
# HELP sink_current_slot last slot processed by the sink of the pipeline
# TYPE sink_current_slot gauge
sink_current_slot 2839340
# HELP sink_event_count number of events processed by the sink of the pipeline
# TYPE sink_event_count counter
sink_event_count 2277714
# HELP source_current_height last height (block #) processed by the source of the pipeline
# TYPE source_current_height gauge
source_current_height 2837810
# HELP source_current_slot last slot processed by the source of the pipeline
# TYPE source_current_slot gauge
source_current_slot 2839340
# HELP source_event_count number of events processed by the source of the pipeline
# TYPE source_event_count counter
source_event_count 2277715

Regardless of the above mechanism, the inteded approach for tracking Oura's metrics is to use a monitoring infrastructure compatible with Prometheus format. Setting up and managing the monitoring stack is outside the scope of Oura. If you don't have any infrastructure in place, we recommend checking out some of the more commons stacks:

  • Prometheus Server + Grafana
  • Metricbeat + Elasticsearch + Kibana
  • Telegraf + InfluxDB + Chronograf

The following screenshot is an example of a Grafana dashboard showing Prometheus data scraped from an Oura instance:

Grafana Dashboard

Mapper Options

A set of "expensive" event mapping procedures that require an explicit opt-in to be activated.

Context

One of the main concerns of Oura is turning block / tx data into atomic events to send down the pipeline for further processing. The source stage is responsible for executing these mapping procedures.

Most of the time, this logic is generic enough that it can be reused in different scenarios. For example, the N2N and the N2C sources share the same mapping procedures. If a particular use-case needs to cherry-pick, enrich or alter the data in some way, the recommendation is to handle the transformation in downstream stages, by using any of the built-in filter or by creating new ones.

There are some exceptions though, whenever a mapping has a heavy impact on performance, it is better to disable it completely at the source level to avoid paying the overhead associated with the initial processing of the data.

Feature

We consider a mapping procedure "expensive" if it involves: handling a relative large amount of data, computing some relatively expensive value or generating redundant data required only for very particular use cases.

For these expensive procedures, we provide configurable options that instructs an Oura instance running in daemon mode to opt-in on each particular rule.

Configuration

The mapper options can be defined by adding the following configuration in the daemon.toml file:

[source.mapper]
include_block_end_events = <bool>
include_transaction_details = <bool>
include_transaction_end_events = <bool>
include_block_cbor = <bool>
include_byron_ebb = <bool>
  • include_block_end_events: if enabled, the source will output an event signaling the end of a block, duplicating all of the data already sent in the corresponding block start event. Default value is false.
  • include_transaction_details: if enabled, each transaction event payload will contain an nested version of all of the details of the transaction (inputs, outputs, mint, assets, metadata, etc). Useful when the pipeline needs to process the tx as a unit, instead of handling each sub-object as an independent event. Default value is false.
  • include_transaction_end_events: if enabled, the source will output an event signaling the end of a transaction, duplicating all of the data already sent in the corresponding transaction start event. Defaul value is false.
  • include_block_cbor: if enabled, the block event will include the raw, unaltered cbor content received from the node, formatted as an hex string. Useful when some custom cbor decoding is required. Default value is false.
  • include_byron_ebb: if enabled, a block event will be emmitted for legacy epoch boundary block of the Byron era (deprecated in newer eras). Useful when performing validation on previous block hashes. Default value is false.

Intersect Options

Advanced options for instructing Oura from which point in the chain to start reading from.

Feature

When running in daemon mode, Oura provides 4 different strategies for finding the intersection point within the chain sync process.

  • Origin: Oura will start reading from the beginning of the chain.
  • Tip: Oura will start reading from the current tip of the chain.
  • Point: Oura will start reading from a particular point (slot, hash) in the chain. If the point is not found, the process will be terminated with a non-zero exit code.
  • Fallbacks: Oura will start reading the first valid point within a set of alternative positions. If point is not valid, the process will fallback into the next available point in the list of options. If none of the points are valid, the process will be terminated with a non-zero exit code.

The default strategy use by Oura is Tip, unless an alternative option is specified via configuration.

You can also define a finalizing point by providing a block hash at which oura will stop reading from the the chain and exit gracefully.

Configuration

To modify the default behaviour used by the daemon mode, a section named [source.intersect] needs to be added in the daemon.toml file.

[source.intersect]
type = <Type>
value = <Value>
  • type: Defines which strategy to use. Valid values are Origin, Tip, Point, Fallbacks. Default value is Tip.
  • value: Either a point or an array of points to be used as argument for the selected strategy.

If you'd like it to only sync an specific section of the chain, you can also instruct oura to stop syncing when it reaches an specific block hash by defining a [source.finalize] config:

[source.finalize]
until_hash = <BlockHash>

Note that unlike the intersect point, no slot is provided for the finalizer.

Examples

The following example show how to configure Oura to use a set of fallback intersection point. The chain sync process will attempt to first intersect at slot 4449598. If not found, it will continue with slot 43159 and finally with slot 0.

[source.intersect]
type = "Fallbacks"
value = [
    [4449598, "2c9ba2611c5d636ecdb3077fde754413c9d6141c6288109922790e53bbb938b5"],
    [43159, "f5d398d6f71a9578521b05c43a668b06b6103f94fcf8d844d4c0aa906704b7a6"],
    [0, "f0f7892b5c333cffc4b3c4344de48af4cc63f55e44936196f365a9ef2244134f"],
]

This configuration will sync the whole Byron era only:

[source.intersect]
type = "Origin"

[source.finalize]
until_hash = "aa83acbf5904c0edfe4d79b3689d3d00fcfc553cf360fd2229b98d464c28e9de"

Custom networks

Instructions on how to configure Oura for connecting to a custom network (aka: other than mainnet / testnet).

Context

Oura requires certain information about the chain it is reading from. In a way, this is similar to the json config files required to run the Cardano node. These values are used for procedures such as encoding bech32 addresses, computing wall-clock time for blocks, etc.

Since mainnet and testnet are well-known, heavily used networks, Oura hardcodes these values as part of the binary release so that the user is spared from having to manually specify them. On the other hand, custom networks require the user to configure these values manually for Oura to establish a connection.

Feature

By adding a [chain] section in the daemon configuration file, users can provide the information required by Oura to connect to a custom network.

The [chain] section has the following propoerties:

NameDataTypeDescription
byron_epoch_lengthintegerthe length (in seconds) of a Byron epoch in this network
byron_slot_lengthintegerthe length (in seconds) of a Byron slot in this network
byron_known_slotintegerthe slot of a Byron block known to exist in this network
byron_known_hashstringthe hash of the known Byron block
byron_known_timeintegerthe unix timestamp of the known Byron block
shelley_epoch_lengthintegerthe length (in seconds) of a Shelley epoch in this network
shelley_slot_lengthintegerthe length (in seconds) of a Shelley slot in this network
shelley_known_slotintegerthe slot of a Shelley block known to exist in this network
shelley_known_hashStringthe hash of the known Shelley block
shelley_known_timeintegerthe unix timestamp of the known Shelley block
address_hrpstringthe human readable part for addresses of this network
adahandle_policystringthe minting policy for AdaHandle on this network.

Examples

Chain information for Testnet

This example configuration shows the values for Testnet. Since testnet values are hardcoded as part of Oura's release, users are not required to input these exact values anywhere, but it serves as a good example of what the configuration looks like.

[chain]
byron_epoch_length  = 432000
byron_slot_length = 20
byron_known_slot = 0
byron_known_hash = "8f8602837f7c6f8b8867dd1cbc1842cf51a27eaed2c70ef48325d00f8efb320f"
byron_known_time = 1564010416
shelley_epoch_length = 432000
shelley_slot_length = 1
shelley_known_slot = 1598400
shelley_known_hash = "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f"
shelley_known_time = 1595967616
address_hrp = "addr_test"
adahandle_policy = "8d18d786e92776c824607fd8e193ec535c79dc61ea2405ddf3b09fe3"

Retry Policy

Advanced options for instructing Oura how to deal with failed attempts in certain sinks.

Supported Sinks

Configuration

To modify the default behaviour used by the sink, a section named [sink.retry_policy] needs to be added in the daemon.toml file.

[sink.retry_policy]
max_retries = 30
backoff_unit =  5000
backoff_factor = 2
max_backoff = 100000
  • max_retries: the max number of retries before failing the whole pipeline. Default value is 20
  • backoff_unit: the delay expressed in milliseconds between each retry. Default value is 5000.
  • backoff_factor: the amount to increase the backoff delay after each attempt. Default value is 2.
  • max_backoff: the longest possible delay in milliseconds. Default value is 100000

Guides

Cardano => Kafka

This guide shows how to leverage Oura to stream data from a Cardano node into a Kafka topic.

About Kafka

Apache Kafka is a framework implementation of a software bus using stream-processing. It is an open-source software platform developed by the Apache Software Foundation written in Scala and Java.

Find more info about Kafka in wikipedia or visit Kafka's official website

Prerequisites

This examples assumes the following prerequisites:

  • A running Cardano node locally accesible via a unix socket.
  • A Kafka cluster accesible through the network.
  • An already existing Kafka topic where to output events
  • Oura binary release installed in local system

Instructions

1. Create an Oura configuration file cardano2kafka.toml

[source]
type = "N2C"
address = ["Unix", "/opt/cardano/cnode/sockets/node0.socket"]
magic = "testnet"

[sink]
type = "Kafka"
brokers = ["kafka-broker-0:9092"]
topic = "cardano-events"

Some comments regarding the above configuration:

  • the [source] section indicates Oura from where to pull chain data.
    • the N2C source type value tells Oura to get the data from a Cardano node using Node-to-Client mini-protocols (chain-sync instantiated to full blocks).
    • the address field indicates that we should connect via Unix socket at the specified path. This value should match the location of your local node socket path.
    • the magic field indicates that our node is running in the testnet network. Change this value to mainnet if appropriate.
  • the [sink] section tells Oura where to send the information it gathered.
    • the Kafka sink type value indicates that Oura should use a Kafka producer client as the output
    • the brokers field indicates the location of the Kafka brokers within the network. Several hostname:port pairs can be added to the array for a "cluster" scenario.
    • the topic fields indicates which Kafka topic to used for the outbound messages.

2. Run Oura in daemon mode

Run the following command from your terminal to start the daemon process:

RUST_LOG=info oura daemon --config cardano2kafka.toml

You should see an output similar to the following:

[2021-12-13T22:16:43Z INFO  oura::sources::n2n::setup] handshake output: Accepted(7, VersionData { network_magic: 764824073, initiator_and_responder_diffusion_mode: false })
[2021-12-13T22:16:43Z INFO  oura::sources::n2n::setup] chain point query output: Some(Tip(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"), 6624258))
[2021-12-13T22:16:43Z INFO  oura::sources::n2n::setup] node tip: Point(47867448,"f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6")
[2021-12-13T22:16:44Z INFO  oura::sources::n2n] rolling block to point Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6")
[2021-12-13T22:16:52Z INFO  oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))
[2021-12-13T22:17:15Z INFO  oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))
[2021-12-13T22:17:20Z INFO  oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))