Saltar al contenido principal

AlmacenamientoEnCaché

BufferedStorageBackend es un backend de ledger en Stellar ingest SDK que recupera los metadatos del ledger desde un lago de datos basado en la nube, típicamente completado por Galexie. Mientras que Galaxie actualmente admite solo GCS, BufferedStorageBackend está diseñado para funcionar con cualquier datastore que implemente la interfaz de datastore. Devuelve metadatos del ledger en formato XDR.

Características Clave

  • Descargas Paralelas: Descarga múltiples ledgers de forma concurrente y los almacena en memoria para un acceso rápido. Esto es particularmente útil para obtener grandes rangos históricos de ledger.
  • Consciente del Esquema: Lee archivos de múltiples ledgers basados en el esquema del datastore, extrayendo un ledger a la vez.
  • Reintentos Automáticos: Maneja las fallas de las solicitudes volviendo a intentar las solicitudes fallidas.
  • Salida XDR: Devuelve metadatos del ledger en formato XDR, lo que permite una fácil integración con otros paquetes en el SDK de ingest (por ejemplo, procesadores).

Requisitos Previos

Instalación y Configuración

  • Ejecuta Galaxie para exportar datos del ledger al almacenamiento en la nube de GCS. Sigue la guía de administración de Galaxie para instrucciones sobre cómo ejecutar Galaxie.

  • Para los propósitos del código de ejemplo, asegúrate de tener acceso a un lago de datos completado por Galaxie, configurado como un bucket de GCS. Para instrucciones sobre cómo crear un lago de datos, consulta la guía del administrador de Galaxie .

Configuración

Configuración del Datastore

Configura el datastore para igualar el esquema utilizado durante la exportación de Galexie. Este esquema define cuántos ledgers por archivo y cuántos archivos por partición.

// Datastore configuration structure
type DataStoreConfig struct {
Type string `toml:"type"` // Data storage type (e.g., GCS)
Params map[string]string `toml:"params"` // Configuration parameters for the datastore
Schema DataStoreSchema `toml:"schema"` // Defines the ledger storage schema
}

Ejemplo de Configuración

datastoreConfig := datastore.DataStoreConfig{
Type: "GCS", // Using Google Cloud Storage as the backend
Params: map[string]string{
"destination_bucket_path": "your-gcs-bucket/data", // GCS bucket path to the data
},
Schema: datastore.DataStoreSchema{
LedgersPerFile: 1, // 1 ledger per file
FilesPerPartition: 64000, // Number of files per partition
},
}

Configuración de BufferedStorageBackend

Configura el BufferedStorageBackend para controlar la concurrencia de descargas, el almacenamiento en búfer y el comportamiento de reintento.

// BufferedStorageBackend configuration structure
type BufferedStorageBackendConfig struct {
BufferSize uint32 `toml:"buffer_size"` // Number of files to buffer in memory
NumWorkers uint32 `toml:"num_workers"` // Number of concurrent workers for downloading ledgers
RetryLimit uint32 `toml:"retry_limit"` // Number of retry attempts on failure
RetryWait time.Duration `toml:"retry_wait"` // Time to wait between retry attempts
}

Ejemplo de Configuración

// BufferedStorageBackend configuration instance
backendConfig := ledgerbackend.BufferedStorageBackendConfig{
BufferSize: 100, // Buffer upto 100 files in memory
NumWorkers: 10, // 10 parallel download workers
RetryLimit: 3, // Retry up to 3 times on failure
RetryWait: 5 * time.Second, // Wait 5 seconds between retries

}

Puedes especificar estos valores individualmente o usar la configuración predeterminada. La configuración predeterminada ajusta automáticamente el número de trabajadores de descarga paralelos y el tamaño del buffer según el tamaño del objeto (número de ledgers por archivo). Estos valores se basan en pruebas empíricas, pero la configuración óptima puede variar según las condiciones de hardware y red.

Uso

BufferedStorageBackend se puede usar para el procesamiento por lotes de un rango histórico de ledgers así como para obtener nuevos ledgers en tiempo real a medida que están disponibles.

Aquí hay un código de ejemplo que utiliza BufferedStorageBackend para el procesamiento por lotes de rangos históricos de ledger.

package main

import (
"context"
"log"
"time"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/errors"
)

func main() {
ctx := context.Background()

// Configure the datastore
datastoreConfig := datastore.DataStoreConfig{
Type: "GCS", // Google Cloud Storage as the backend
Params: map[string]string{
"destination_bucket_path": "your-gcs-bucket/data", // Replace with actual GCS bucket path
},
Schema: datastore.DataStoreSchema{
LedgersPerFile: 1, // Defines how many ledgers per file
FilesPerPartition: 64000, // Defines partition size
},
}

// Initialize the datastore
dataStore, err := datastore.NewDataStore(ctx, datastoreConfig)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to create datastore"))
}
defer dataStore.Close()

// Configure the BufferedStorageBackend
backendConfig := ledgerbackend.BufferedStorageBackendConfig{
BufferSize: 100, // Number of files to buffer in memory
NumWorkers: 10, // Concurrent download workers
RetryLimit: 3, // Maximum retry attempts on failure
RetryWait: 5 * time.Second, // Wait time between retries
}

// Initialize the backend
backend, err := ledgerbackend.NewBufferedStorageBackend(backendConfig, dataStore)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to create buffered storage backend"))
}
defer backend.Close()

// Define the ledger range to process
ledgerRange := ledgerbackend.BoundedRange(1000, 2000)

log.Printf("Starting ledger retrieval for range: %d - %d", ledgerRange.From(), ledgerRange.To())

// Iterate through the ledger sequence
for ledgerSeq := ledgerRange.From(); ledgerSeq <= ledgerRange.To(); ledgerSeq++ {
ledgerCloseMeta, err := backend.GetLedger(ctx, ledgerSeq)
if err != nil {
log.Printf("Warning: Failed to retrieve ledger %d: %v", ledgerSeq, err)
continue
}

log.Printf("Successfully retrieved ledger %d. Ledger sequence: %d", ledgerSeq, ledgerCloseMeta.LedgerSequence())

// Add your logic to process the XDR data
// Example: Parsing transactions, operations, etc.
}

log.Println("Ledger retrieval process completed successfully.")
}

Para la transmisión en tiempo real de nuevos ledgers usando BufferedStorageBackend, consulta el Código de la Canalización de Ingesta.