Skip to main content

CDP Consumer Pipeline Sample Code

Complete code for a small sample of CDP consumer pipeline of Stellar network ledger metadata using the Stellar Go Ingest SDK paired with new CDP SDK to demonstrate data pipeline from ledger metadata to derived data model with event-driven, distributed processing to sample microservice (Python script) as subscriber.

This example uses the ZeroMQ goczmq Go wrapper SDK, which requires a few o/s dependent libraries to also be installed on the host machine.

#1 - Put these files in a directory
#2 - compile and run with go mod tidy;go build -o pipeline ./.; ./pipeline
#3 - in separate terminal, run python, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).


module example/pipeline

go 1.22

toolchain go1.22.1

require ( v0.0.0-20241008214914-7950d4254e6a v4.1.0+incompatible


package main

import (



// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string

// application data pipeline
type Message struct {
Payload interface{}

type Processor interface {
Process(context.Context, Message) error

type Publisher interface {
Subscribe(receiver Processor)

// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err

type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string

func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())

closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)

// scan all transactions in a ledger for payments to derive new model from
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err

for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
return nil

type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
dataStoreConfig datastore.DataStoreConfig

func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)

func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {

// Note, will deprecate this
// need for this type of manual retrieval of latest ledger when
// use case is to initiate streaming from latest point.
// It proposes new mechanism to trigger ApplyLedgerMetadata() to
// perform automatic resolution of 'latest ledger' instead.
historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "payment_demo",
Context: ctx,
if err != nil {
return errors.Wrap(err, "error creating history archive client")
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()

if err != nil {
return errors.Wrap(err, "error getting latest ledger")

ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

pubConfig := cdp.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),

fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range adapter.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
return nil

func main() {
// run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

cfg, err := toml.LoadFile("config.toml")
if err != nil {
fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)

datastoreConfig := datastore.DataStoreConfig{}
// Unmarshal TOML data into the Config struct
if err = cfg.Unmarshal(&datastoreConfig); err != nil {
fmt.Printf("error unmarshalling TOML config: %v\n", err)

// create the inbound source of pubnet ledger metadata
ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
dataStoreConfig: datastoreConfig,

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://")
if err != nil {
log.Printf("error creating 0MQ publisher: %v\n", err)
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
log.Printf("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))


The CDP configuration settings, this file defines the data storage which contains the pre-generated Ledger Metadata files. Google Cloud storage is used in this example.

type = "GCS"

destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"

ledgers_per_file = 1
files_per_partition = 64000

A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to pip install pyzmq

import sys
import zmq
import json

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting payments from pipeline ...")

while True:

message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")