Ingerir eventos publicados desde un contrato
Soroban RPC proporciona un método getEvents
que te permite consultar eventos de un contrato inteligente. Sin embargo, la ventana de retención de datos para estos eventos es de 7 días como máximo. Si necesitas acceso a un registro de estos eventos más duradero, querrás "ingerir" los eventos a medida que se publican, manteniendo tu propio registro o base de datos a medida que se ingieren los eventos.
Hay muchas estrategias que puedes usar para ingerir y mantener los eventos publicados por un contrato inteligente. Entre las más simples podría estar utilizar una herramienta desarrollada por la comunidad como Mercury, que se encargará de todo el trabajo de infraestructura por una baja tarifa de suscripción.
Otro enfoque que exploraremos aquí es usar un trabajo cron para consultar Stellar RPC periódicamente y almacenar los eventos relevantes en una base de datos SQLite almacenada localmente. Vamos a usar un Mapeador Relacional de Objetos (ORM), que nos permitirá escribir consultas a la base de datos directamente en Python o JavaScript.
Configuración
- Python
En un entorno virtual, instala las dependencias de Python:
pip install sqlalchemy stellar-sdk
Configurar el Cliente de Base de Datos
- Python
- JavaScript
Para acceder a la base de datos, vamos a usar SQLAlchemy, que es una biblioteca de Python frecuentemente utilizada para consultar bases de datos.
Vamos a ingerir eventos en una tabla llamada SorobanEvent
. En SQLAlchemy, esto se traduce en una clase, también llamada modelo de base de datos:
from typing import Any
from sqlalchemy import orm, JSON
class Base(orm.DeclarativeBase):
# needed to tell SQLAlchemy to translate a dictionary into a JSON entry
type_annotation_map = {
dict[str, Any]: JSON,
}
class Event(Base):
__tablename__ = "SorobanEvent"
id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
contract_id: orm.Mapped[str]
ledger: orm.Mapped[int]
topics: orm.Mapped[dict[str, Any]]
value: orm.Mapped[str]
Usaremos una base de datos SQLite solo en memoria para esta guía, pero gracias al uso de un ORM, podríamos estar usando cualquier otra base de datos admitida. Simplemente tendríamos que cambiar la cadena de conexión.
from sqlalchemy import create_engine
engine = create_engine("sqlite://", echo=True)
# the following creates the table in the DB
Base.metadata.create_all(engine)
Al establecer echo=True
, podemos entender lo que está sucediendo en la base de datos. La creación de la tabla de la base de datos genera los siguientes registros:
BEGIN (implicit)
PRAGMA main.table_info("SorobanEvent")
...
PRAGMA temp.table_info("SorobanEvent")
...
CREATE TABLE SorobanEvent (
id INTEGER NOT NULL,
contract_id VARCHAR NOT NULL,
ledger INTEGER NOT NULL,
topics JSON NOT NULL,
value VARCHAR NOT NULL,
PRIMARY KEY (id)
)
...
COMMIT
Los detalles más finos de elegir una configuración de Prisma están más allá del alcance de este documento. Puedes obtener mucha más información en la guía rápida de Prisma. Aquí está el modelo del esquema de nuestro Prisma:
model SorobanEvent {
id String @id
ledger Int
contract_id String
topic_1 String?
topic_2 String?
topic_3 String?
topic_4 String?
value String
}
Usar un modelo de base de datos es muy conveniente ya que nos permite controlar el esquema de la base de datos programáticamente. Si necesitamos cambiar el esquema, por ejemplo, agregando nuevas columnas, entonces usar un ORM nos permite utilizar herramientas de migración muy poderosas.
Usaremos este modelo para crear y consultar los eventos almacenados en nuestra base de datos.
Consultar eventos desde Stellar RPC
Primero, necesitaremos consultar los eventos desde Stellar RPC. Este ejemplo simple realiza una solicitud RPC usando el método getEvents
, filtrando todos los eventos de transfer
que son emitidos por el contrato nativo XLM.
Estamos haciendo algunas suposiciones aquí. Supondremos que tu contrato tiene suficiente actividad, y que estás consultando eventos con suficiente frecuencia para no estar en peligro de necesitar averiguar cuál es el ledger más antiguo que Stellar RPC conoce. El enfoque que estamos tomando es encontrar el número de secuencia del ledger más grande (más reciente) en la base de datos y consultar eventos comenzando desde allí. Tu caso de uso puede requerir algo de lógica para determinar cuál es el ledger más reciente y cuál es el ledger más antiguo disponible, etc.
- Python
- JavaScript
Si comenzamos desde cero, no hay ledger conocido, así que podemos intentar ingerir aproximadamente los últimos 7 días asumiendo que un ledger se cierra cada 6s.
import stellar_sdk
soroban_server = stellar_sdk.SorobanServer()
ledger = soroban_server.get_latest_ledger().sequence-int(3600 / 6 * 24 * 7)
Más adelante, podremos empezar desde el ledger más recientemente ingerido haciendo una consulta a nuestra base de datos.
with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event.ledger).where(
Event.contract_id == contract_id
).order_by(Event.ledger.desc())
ledger = session.scalars(stmt).first()
¡Obtengamos eventos de Stellar RPC!
from stellar_sdk.soroban_rpc import EventFilter, EventFilterType
contract_id = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"
res = soroban_server.get_events(
ledger,
filters=[
EventFilter(
event_type=EventFilterType.CONTRACT,
contract_ids=[contract_id],
topics=[["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]],
)
],
)
events = res.events
Usamos la biblioteca @stellar/stellar-sdk
:
import { SorobanRpc } from "@stellar/stellar-sdk";
import { PrismaClient } from "@prisma/client";
const server = new SorobanRpc.Server("https://soroban-testnet.stellar.org");
const prisma = new PrismaClient();
let latestEventIngested = await prisma.sorobanEvent.findFirst({
orderBy: [
{
ledger: "desc",
},
],
});
let events = await server.getEvents({
startLedger: latestEventIngested.ledger,
filters: [
{
type: "contract",
contractIds: ["CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"],
topics: [["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]],
},
],
});
Almacenar eventos en la base de datos
Ahora, vamos a verificar si el objeto events
contiene algún evento nuevo que debamos almacenar, y hacemos exactamente eso. Estamos almacenando los topics y valores del evento como cadenas codificadas en base64 aquí, pero podrías decodificar los topics y valores necesarios a los tipos de datos apropiados para tu caso de uso.
- Python
- JavaScript
Facilita tu vida con un sessionmaker de SQLAlchemy al realizar transacciones (por ejemplo, agregar registros).
import sqlalchemy
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
with Session.begin() as session:
events_ = []
for event in events:
topic_ = event.topic
value = event.value
events_.append(Event(contract_id=contract_id, ledger=event.ledger, topics=topic_, value=value))
session.add_all(events_)
BEGIN (implicit)
INFO sqlalchemy.engine.Engine COMMIT
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3311, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3325, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
...
COMMIT
if (events.events?.length) {
events.events.forEach(async (event) => {
await prisma.sorobanEvent.create({
data: {
id: event.id,
type: event.type,
ledger: event.ledger,
contract_id: event.contractId.toString(),
topic_1: event.topic[0].toXDR("base64") || null,
topic_2: event.topic[1].toXDR("base64") || null,
topic_3: event.topic[2].toXDR("base64") || null,
topic_4: event.topic[3].toXDR("base64") || null,
value: event.value.toXDR("base64"),
},
});
});
}
Ejecutar el Script con Cron
Una entrada cron es una excelente manera de automatizar este script para recopilar e ingerir eventos cada cierto tiempo. Puedes configurar este script para que se ejecute tan (in)frecuentemente como desees o necesites. Este ejemplo ejecutaría el script cada 24 horas a la 1:14 pm:
- Python
- JavaScript
14 13 * * * python /absolute/path/to/script.py
Aquí hay otro ejemplo que ejecutaría el script cada 30 minutos:
30 * * * * python /absolute/path/to/script.py
14 13 * * * node /absolute/path/to/script.js
Aquí hay otro ejemplo que ejecutaría el script cada 30 minutos:
30 * * * * node /absolute/path/to/script.js
Guías en esta categoría:
📄️ Consumir eventos previamente ingeridos
Consumir eventos ingeridos sin consultar de nuevo el RPC
📄️ Ingerir eventos publicados desde un contrato
Usar el método getEvents de Stellar RPC para consultar eventos, con una ventana de retención de 7 días
📄️ Publicar eventos desde un contrato de Rust
Publicar eventos desde un contrato de Rust