Código de ejemplo de la canalización del consumidor CDP
Código completo para un pequeño ejemplo de un pipeline consumidor de metadatos del ledger de la red Stellar usando el Ingest SDK de Stellar Go para demostrar el pipeline de datos desde metadatos del ledger hasta un modelo de datos derivado con procesamiento distribuido y basado en eventos hacia un microservicio de muestra (script Python) como suscriptor.
Este ejemplo utiliza el SDK envoltorio de Go goczmq de ZeroMQ, que requiere varias bibliotecas dependientes del sistema operativo para que también estén instaladas en la máquina host.
Este ejemplo requiere tener acceso a un lago de metadatos de ledger público que esté siendo completado activamente con los últimos ledgers de la mainnet de Stellar. Para propósitos del ejemplo, utiliza un lago de datos de referencia alojado en AWS Open Data S3
Paso #1 - Crea un directorio de ejemplo y copia los siguientes archivos a tu estación de trabajo.
- bash
mkdir pipeline-example; cd pipeline-example
main.go
- Go
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/signal"
"github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go-stellar-sdk/amount"
"github.com/stellar/go-stellar-sdk/historyarchive"
"github.com/stellar/go-stellar-sdk/ingest"
"github.com/stellar/go-stellar-sdk/ingest/ledgerbackend"
"github.com/stellar/go-stellar-sdk/network"
"github.com/stellar/go-stellar-sdk/support/datastore"
"github.com/stellar/go-stellar-sdk/support/storage"
"github.com/stellar/go-stellar-sdk/xdr"
"github.com/stellar/go-stellar-sdk/support/log"
"github.com/zeromq/goczmq"
)
// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string
}
// application data pipeline
type Message struct {
Payload interface{}
}
type Processor interface {
Process(context.Context, Message) error
}
type Publisher interface {
Subscribe(receiver Processor)
}
// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock
}
func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}
type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string
}
func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)
}
func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
}
closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)
// scan all transactions in a ledger for payments to derive new model from
counter := 0
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
}
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err
}
for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
}
counter++
}
}
}
if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
}
log.Infof("Published %v payments from ledger sequnce %v", counter, ledgerCloseMeta.LedgerSequence())
return nil
}
type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
dataStoreConfig datastore.DataStoreConfig
}
func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}
func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {
// Get the lastest ledger from network.
historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "payment_demo",
Context: ctx,
},
})
if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}
ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)
pubConfig := ingest.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: ingest.DefaultBufferedStorageBackendConfig(1),
}
log.Infof("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
return ingest.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range adapter.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
}
}
return nil
})
}
func main() {
// run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()
log.SetLevel(log.InfoLevel)
cfg, err := toml.LoadFile("config.toml")
if err != nil {
fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
return
}
datastoreConfig := datastore.DataStoreConfig{}
// Unmarshal TOML data into the Config struct
if err = cfg.Unmarshal(&datastoreConfig); err != nil {
fmt.Printf("error unmarshalling TOML config: %v\n", err)
return
}
// create the inbound source of pubnet ledger metadata
ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
dataStoreConfig: datastoreConfig,
}
// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}
// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Infof("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}
// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
ledgerMetadataInboundAdapter.Subscribe(appTransformer)
log.Infof("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}
config.toml
La configuración de CDP, este archivo define el almacenamiento de datos que contiene los archivos de metadatos del ledger pre-generados. Se utiliza el bucket S3 Public Blockchain para Stellar Pubnet en este ejemplo.
- Ejemplo
type = "S3"
[params]
destination_bucket_path = "aws-public-blockchain/v1.1/stellar/ledgers/pubnet"
region = "us-east-2"
# this is the schema specific to the public s3 data lake being used
[schema]
ledgers_per_file = 1
files_per_partition = 64000
distributed_payment_subsciber.py
Un script de Python que demuestra cómo ahora tenemos procesamiento distribuido y arquitectura impulsada por eventos aprovechando el MQ Broker para enviar el modelo de datos de aplicación de pago derivado a otros microservicios. Asegúrate de pip install pyzmq
- Python
import sys
import zmq
import json
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")
while True:
message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")
Paso #2 - Compila y ejecuta el ejemplo del pipeline de ingestión.
- bash
go mod init example/pipeline
go get github.com/stellar/go-stellar-sdk@latest github.com/zeromq/[email protected]
go mod tidy
go build -o pipeline ./.
AWS_SHARED_CREDENTIALS_FILE=/dev/null ./pipeline
Step# 3 - Run the distributed pipeline consumer
In separate terminal, run python distributed_payment_subsciber.py, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).