Saltar al contenido principal

Código de ejemplo de la canalización del consumidor CDP

Código completo para un pequeño ejemplo de la canalización del consumidor CDP de metadatos de ledger de la red Stellar utilizando el Stellar Go Ingest SDK emparejado con el nuevo CDP SDK para demostrar la canalización de datos desde los metadatos del ledger hasta el modelo de datos derivado con procesamiento distribuido y basado en eventos a un microservicio de ejemplo (script de Python) como suscriptor.

Este ejemplo utiliza el SDK envoltorio de Go goczmq de ZeroMQ, que requiere varias bibliotecas dependientes del sistema operativo para que también estén instaladas en la máquina host.

Pasos:
#1 - Coloca estos archivos en un directorio
#2 - compila y ejecuta con go mod tidy;go build -o pipeline ./.; ./pipeline
#3 - en una terminal separada, ejecuta python distributed_payment_subsciber.py, esto ejecutará la topología de la canalización distribuida, ya que recibe mensajes con información de pago del proceso de canalización y realiza procesamiento adicional (imprimiéndolo en la consola).

go.mod

module example/pipeline

go 1.22

toolchain go1.22.1

require (
github.com/stellar/go v0.0.0-20241008214914-7950d4254e6a
github.com/zeromq/goczmq v4.1.0+incompatible
)

main.go


package main

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/signal"

"github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/cdp"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/zeromq/goczmq"
)

// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string
}

// application data pipeline
type Message struct {
Payload interface{}
}

type Processor interface {
Process(context.Context, Message) error
}

type Publisher interface {
Subscribe(receiver Processor)
}

// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock
}

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}

type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string
}

func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)
}

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
}

closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)

// scan all transactions in a ledger for payments to derive new model from
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
}
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err
}

for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
}
}
}
}
if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
}
return nil
}

type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
dataStoreConfig datastore.DataStoreConfig
}

func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}

func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {

/////////////////////////////////////////////////////////////////
// Note, https://github.com/stellar/go/issues/5495 will deprecate this
// need for this type of manual retrieval of latest ledger when
// use case is to initiate streaming from latest point.
// It proposes new mechanism to trigger ApplyLedgerMetadata() to
// perform automatic resolution of 'latest ledger' instead.
historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "payment_demo",
Context: ctx,
},
})
if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
/////////////////////////////////////////////////////////////////////

if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}

ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

pubConfig := cdp.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),
}

fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range adapter.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
}
}
return nil
})
}

func main() {
// run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

cfg, err := toml.LoadFile("config.toml")
if err != nil {
fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
return
}

datastoreConfig := datastore.DataStoreConfig{}
// Unmarshal TOML data into the Config struct
if err = cfg.Unmarshal(&datastoreConfig); err != nil {
fmt.Printf("error unmarshalling TOML config: %v\n", err)
return
}

// create the inbound source of pubnet ledger metadata
ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
dataStoreConfig: datastoreConfig,
}

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Printf("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
ledgerMetadataInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}

config.toml

La configuración de CDP, este archivo define el almacenamiento de datos que contiene los archivos de metadatos del ledger pre-generados. Se utiliza el almacenamiento en Google Cloud en este ejemplo.

type = "GCS"

[params]
destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"

[schema]
ledgers_per_file = 1
files_per_partition = 64000

distributed_payment_subsciber.py

Un script de Python que demuestra cómo ahora tenemos procesamiento distribuido y arquitectura impulsada por eventos aprovechando el MQ Broker para enviar el modelo de datos de aplicación de pago derivado a otros microservicios. Asegúrate de pip install pyzmq

import sys
import zmq
import json

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")

while True:

message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")