Skip to main content

CDP Consumer Pipeline Sample Code

Complete code for a small sample of CDP consumer pipeline of Stellar network ledger metadata using the Stellar Go Ingest SDK paired with new CDP SDK to demonstrate data pipeline from ledger metadata to derived data model with event-driven, distributed processing to sample microservice (Python script) as subscriber.

This example uses the ZeroMQ goczmq Go wrapper SDK, which requires a few o/s dependent libraries to also be installed on the host machine.

Steps:
#1 - Put these files in a directory
#2 - compile and run with go mod tidy;go build -o pipeline ./.; ./pipeline
#3 - in separate terminal, run python distributed_payment_subsciber.py, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).

go.mod​

module example/pipeline

go 1.22

toolchain go1.22.1

require (
github.com/stellar/go v0.0.0-20241008214914-7950d4254e6a
github.com/zeromq/goczmq v4.1.0+incompatible
)

main.go​


package main

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/signal"

"github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/cdp"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/zeromq/goczmq"
)

// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string
}

// application data pipeline
type Message struct {
Payload interface{}
}

type Processor interface {
Process(context.Context, Message) error
}

type Publisher interface {
Subscribe(receiver Processor)
}

// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock
}

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}

type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string
}

func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)
}

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
}

closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)

// scan all transactions in a ledger for payments to derive new model from
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
}
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err
}

for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
}
}
}
}
if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
}
return nil
}

type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
dataStoreConfig datastore.DataStoreConfig
}

func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}

func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {

/////////////////////////////////////////////////////////////////
// Note, https://github.com/stellar/go/issues/5495 will deprecate this
// need for this type of manual retrieval of latest ledger when
// use case is to initiate streaming from latest point.
// It proposes new mechanism to trigger ApplyLedgerMetadata() to
// perform automatic resolution of 'latest ledger' instead.
historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "payment_demo",
Context: ctx,
},
})
if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
/////////////////////////////////////////////////////////////////////

if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}

ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

pubConfig := cdp.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),
}

fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range adapter.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
}
}
return nil
})
}

func main() {
// run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

cfg, err := toml.LoadFile("config.toml")
if err != nil {
fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
return
}

datastoreConfig := datastore.DataStoreConfig{}
// Unmarshal TOML data into the Config struct
if err = cfg.Unmarshal(&datastoreConfig); err != nil {
fmt.Printf("error unmarshalling TOML config: %v\n", err)
return
}

// create the inbound source of pubnet ledger metadata
ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
dataStoreConfig: datastoreConfig,
}

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Printf("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
ledgerMetadataInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}

config.toml​

The CDP configuration settings, this file defines the data storage which contains the pre-generated Ledger Metadata files. Google Cloud storage is used in this example.

type = "GCS"

[params]
destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"

[schema]
ledgers_per_file = 1
files_per_partition = 64000

distributed_payment_subsciber.py​

A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to pip install pyzmq

import sys
import zmq
import json

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")

while True:

message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")