Skip to main content

CDP Consumer Pipeline Sample Code

Complete code for a small sample of a consumer pipeline of Stellar network ledger metadata using the Stellar Go Ingest SDK to demonstrate data pipeline from ledger metadata to derived data model with event-driven, distributed processing to sample microservice (Python script) as subscriber.

This example uses the ZeroMQ goczmq Go wrapper SDK, which requires a few o/s dependent libraries to also be installed on the host machine.

This example requires having access to a public ledger metadata lake that is actively populated with latest ledgers from Stellar mainnet. For purposes of the example it uses a reference data lake hosted on AWS Open Data S3

Step# 1 - Create example directory and copy following files to your workstation.

mkdir pipeline-example; cd pipeline-example

main.go


package main

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/signal"

"github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go-stellar-sdk/amount"
"github.com/stellar/go-stellar-sdk/historyarchive"
"github.com/stellar/go-stellar-sdk/ingest"
"github.com/stellar/go-stellar-sdk/ingest/ledgerbackend"
"github.com/stellar/go-stellar-sdk/network"
"github.com/stellar/go-stellar-sdk/support/datastore"
"github.com/stellar/go-stellar-sdk/support/storage"
"github.com/stellar/go-stellar-sdk/xdr"
"github.com/stellar/go-stellar-sdk/support/log"

"github.com/zeromq/goczmq"
)

// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string
}

// application data pipeline
type Message struct {
Payload interface{}
}

type Processor interface {
Process(context.Context, Message) error
}

type Publisher interface {
Subscribe(receiver Processor)
}

// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock
}

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}

type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string
}

func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)
}

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
}

closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)

// scan all transactions in a ledger for payments to derive new model from
counter := 0
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
}
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err
}

for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
}
counter++
}
}
}

if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
}

log.Infof("Published %v payments from ledger sequnce %v", counter, ledgerCloseMeta.LedgerSequence())
return nil
}

type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
dataStoreConfig datastore.DataStoreConfig
}

func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}

func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {

// Get the lastest ledger from network.
historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "payment_demo",
Context: ctx,
},
})
if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()

if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}

ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

pubConfig := ingest.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: ingest.DefaultBufferedStorageBackendConfig(1),
}

log.Infof("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
return ingest.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range adapter.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
}
}
return nil
})
}

func main() {
// run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()
log.SetLevel(log.InfoLevel)

cfg, err := toml.LoadFile("config.toml")
if err != nil {
fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
return
}

datastoreConfig := datastore.DataStoreConfig{}
// Unmarshal TOML data into the Config struct
if err = cfg.Unmarshal(&datastoreConfig); err != nil {
fmt.Printf("error unmarshalling TOML config: %v\n", err)
return
}

// create the inbound source of pubnet ledger metadata
ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
dataStoreConfig: datastoreConfig,
}

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Infof("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
ledgerMetadataInboundAdapter.Subscribe(appTransformer)
log.Infof("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}

config.toml

The CDP configuration settings, this file defines the data storage which contains the pre-generated Ledger Metadata files. The S3 Public Blockchain bucket for Stellar Pubnet is used in this example.

type = "S3"

[params]
destination_bucket_path = "aws-public-blockchain/v1.1/stellar/ledgers/pubnet"
region = "us-east-2"

# this is the schema specific to the public s3 data lake being used
[schema]
ledgers_per_file = 1
files_per_partition = 64000

distributed_payment_subsciber.py

A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to pip install pyzmq

import sys
import zmq
import json

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")

while True:

message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")

Step# 2 - Compile and run the ingestion pipeline example.

go mod init example/pipeline
go get github.com/stellar/go-stellar-sdk@latest github.com/zeromq/[email protected]
go mod tidy
go build -o pipeline ./.
AWS_SHARED_CREDENTIALS_FILE=/dev/null ./pipeline

Step# 3 - Run the distributed pipeline consumer

In separate terminal, run python distributed_payment_subsciber.py, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).