BufferedStorageBackend
BufferedStorageBackend is a ledger backend in Stellar ingest SDK that retrieves ledger metadata from a cloud-based data lake, typically populated by Galexie. While Galaxie currently supports only GCS, BufferedStorageBackend
is designed to work with any datastore that implements the datastore interface. It returns ledger metadata in XDR format.
Key Features
- Parallel Downloads: Downloads multiple ledgers concurrently and buffers them in memory for fast access. This is particularly useful for fetching large historical ledger ranges.
- Schema-Aware: Reads multi-ledger files based on the datastore schema, extracting one ledger at a time.
- Automatic Retries: Handles request failures by retrying failed requests.
- XDR Output: Returns ledger metadata in XDR format, enabling easy integration with other packages in ingest SDK (e.g., processors).
Prerequisites
Installation & Setup
-
Run Galaxie to export ledger data to GCS cloud storage. Follow the Galaxie admin guide for instructions on running Galaxie.
-
For purposes of the example code, ensure access to a data lake populated by Galaxie, configured as a GCS bucket. For instructions on creating a data lake, refer to the Galaxie admin guide .
Configuration
Datastore Configuration
Configure the datastore to match the schema used during the Galexie export. This schema defines how many ledgers per file, and how many files per partition.
// Datastore configuration structure
type DataStoreConfig struct {
Type string `toml:"type"` // Data storage type (e.g., GCS)
Params map[string]string `toml:"params"` // Configuration parameters for the datastore
Schema DataStoreSchema `toml:"schema"` // Defines the ledger storage schema
}
Example Configuration
datastoreConfig := datastore.DataStoreConfig{
Type: "GCS", // Using Google Cloud Storage as the backend
Params: map[string]string{
"destination_bucket_path": "your-gcs-bucket/data", // GCS bucket path to the data
},
Schema: datastore.DataStoreSchema{
LedgersPerFile: 1, // 1 ledger per file
FilesPerPartition: 64000, // Number of files per partition
},
}
BufferedStorageBackend Configuration
Configure the BufferedStorageBackend
to control download concurrency, buffering, and retry behavior.
// BufferedStorageBackend configuration structure
type BufferedStorageBackendConfig struct {
BufferSize uint32 `toml:"buffer_size"` // Number of files to buffer in memory
NumWorkers uint32 `toml:"num_workers"` // Number of concurrent workers for downloading ledgers
RetryLimit uint32 `toml:"retry_limit"` // Number of retry attempts on failure
RetryWait time.Duration `toml:"retry_wait"` // Time to wait between retry attempts
}
Example Configuration
// BufferedStorageBackend configuration instance
backendConfig := ledgerbackend.BufferedStorageBackendConfig{
BufferSize: 100, // Buffer upto 100 files in memory
NumWorkers: 10, // 10 parallel download workers
RetryLimit: 3, // Retry up to 3 times on failure
RetryWait: 5 * time.Second, // Wait 5 seconds between retries
}
You can specify these values individually or use the default configuration. The default settings automatically adjust the number of parallel download workers and buffer size based on the object size (ledger count per file). These values are based on empirical testing, but the optimal configuration may vary depending on hardware and network conditions.
Usage
BufferedStorageBackend can be used for batch processing a historical range of ledgers as well as for fetching new ledgers in real-time as they become available.
Here is a sample code that uses BufferedStorageBackend
for batch processing of historical ledger range.
package main
import (
"context"
"log"
"time"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/errors"
)
func main() {
ctx := context.Background()
// Configure the datastore
datastoreConfig := datastore.DataStoreConfig{
Type: "GCS", // Google Cloud Storage as the backend
Params: map[string]string{
"destination_bucket_path": "your-gcs-bucket/data", // Replace with actual GCS bucket path
},
Schema: datastore.DataStoreSchema{
LedgersPerFile: 1, // Defines how many ledgers per file
FilesPerPartition: 64000, // Defines partition size
},
}
// Initialize the datastore
dataStore, err := datastore.NewDataStore(ctx, datastoreConfig)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to create datastore"))
}
defer dataStore.Close()
// Configure the BufferedStorageBackend
backendConfig := ledgerbackend.BufferedStorageBackendConfig{
BufferSize: 100, // Number of files to buffer in memory
NumWorkers: 10, // Concurrent download workers
RetryLimit: 3, // Maximum retry attempts on failure
RetryWait: 5 * time.Second, // Wait time between retries
}
// Initialize the backend
backend, err := ledgerbackend.NewBufferedStorageBackend(backendConfig, dataStore)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to create buffered storage backend"))
}
defer backend.Close()
// Define the ledger range to process
ledgerRange := ledgerbackend.BoundedRange(1000, 2000)
log.Printf("Starting ledger retrieval for range: %d - %d", ledgerRange.From(), ledgerRange.To())
// Iterate through the ledger sequence
for ledgerSeq := ledgerRange.From(); ledgerSeq <= ledgerRange.To(); ledgerSeq++ {
ledgerCloseMeta, err := backend.GetLedger(ctx, ledgerSeq)
if err != nil {
log.Printf("Warning: Failed to retrieve ledger %d: %v", ledgerSeq, err)
continue
}
log.Printf("Successfully retrieved ledger %d. Ledger sequence: %d", ledgerSeq, ledgerCloseMeta.LedgerSequence())
// Add your logic to process the XDR data
// Example: Parsing transactions, operations, etc.
}
log.Println("Ledger retrieval process completed successfully.")
}
For real-time streaming of new ledgers using BufferedStorageBackend
, refer to the Ingestion Pipeline Code.