Cardano => Kafka
This guide shows how to leverage Oura to stream data from a Cardano node into a Kafka topic.
About Kafka
Apache Kafka is a framework implementation of a software bus using stream-processing. It is an open-source software platform developed by the Apache Software Foundation written in Scala and Java.
Find more info about Kafka in wikipedia or visit Kafka's official website
Prerequisites
This examples assumes the following prerequisites:
- A running Cardano node locally accesible via a unix socket.
- A Kafka cluster accesible through the network.
- An already existing Kafka topic where to output events
- Oura binary release installed in local system
Instructions
1. Create an Oura configuration file cardano2kafka.toml
[source]
type = "N2C"
address = ["Unix", "/opt/cardano/cnode/sockets/node0.socket"]
magic = "testnet"
[sink]
type = "Kafka"
brokers = ["kafka-broker-0:9092"]
topic = "cardano-events"
Some comments regarding the above configuration:
- the
[source]
section indicates Oura from where to pull chain data.- the
N2C
source type value tells Oura to get the data from a Cardano node using Node-to-Client mini-protocols (chain-sync instantiated to full blocks). - the
address
field indicates that we should connect viaUnix
socket at the specified path. This value should match the location of your local node socket path. - the
magic
field indicates that our node is running in thetestnet
network. Change this value tomainnet
if appropriate.
- the
- the
[sink]
section tells Oura where to send the information it gathered.- the
Kafka
sink type value indicates that Oura should use a Kafka producer client as the output - the
brokers
field indicates the location of the Kafka brokers within the network. Several hostname:port pairs can be added to the array for a "cluster" scenario. - the
topic
fields indicates which Kafka topic to used for the outbound messages.
- the
2. Run Oura in daemon
mode
Run the following command from your terminal to start the daemon process:
RUST_LOG=info oura daemon --config cardano2kafka.toml
You should see an output similar to the following:
[2021-12-13T22:16:43Z INFO oura::sources::n2n::setup] handshake output: Accepted(7, VersionData { network_magic: 764824073, initiator_and_responder_diffusion_mode: false })
[2021-12-13T22:16:43Z INFO oura::sources::n2n::setup] chain point query output: Some(Tip(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"), 6624258))
[2021-12-13T22:16:43Z INFO oura::sources::n2n::setup] node tip: Point(47867448,"f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6")
[2021-12-13T22:16:44Z INFO oura::sources::n2n] rolling block to point Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6")
[2021-12-13T22:16:52Z INFO oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))
[2021-12-13T22:17:15Z INFO oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))
[2021-12-13T22:17:20Z INFO oura::sources::n2n] requesting block fetch for point Some(Point(47867448, "f170baa5702c91b23580291c3a184195df7c77d3e1a03b3d6424793aacc850d6"))