Saltar al contenido principal

Ingerir eventos publicados desde un contrato

Soroban RPC proporciona un método getEvents que te permite consultar eventos de un contrato inteligente. Sin embargo, la ventana de retención de datos para estos eventos es de 7 días como máximo. Si necesitas acceso a un registro de estos eventos más duradero, querrás "ingerir" los eventos a medida que se publican, manteniendo tu propio registro o base de datos a medida que se ingieren los eventos.

Hay muchas estrategias que puedes usar para ingerir y mantener los eventos publicados por un contrato inteligente. Entre las más simples podría estar utilizar una herramienta desarrollada por la comunidad como Mercury, que se encargará de todo el trabajo de infraestructura por una baja tarifa de suscripción.

Otro enfoque que exploraremos aquí es usar un trabajo cron para consultar Stellar RPC periódicamente y almacenar los eventos relevantes en una base de datos SQLite almacenada localmente. Vamos a usar un Mapeador Relacional de Objetos (ORM), que nos permitirá escribir consultas a la base de datos directamente en Python o JavaScript.

Configuración

En un entorno virtual, instala las dependencias de Python:

pip install sqlalchemy stellar-sdk

Configurar el Cliente de Base de Datos

Para acceder a la base de datos, vamos a usar SQLAlchemy, que es una biblioteca de Python frecuentemente utilizada para consultar bases de datos.

Vamos a ingerir eventos en una tabla llamada SorobanEvent. En SQLAlchemy, esto se traduce en una clase, también llamada modelo de base de datos:

from typing import Any
from sqlalchemy import orm, JSON


class Base(orm.DeclarativeBase):
# needed to tell SQLAlchemy to translate a dictionary into a JSON entry
type_annotation_map = {
dict[str, Any]: JSON,
}


class Event(Base):
__tablename__ = "SorobanEvent"

id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
contract_id: orm.Mapped[str]
ledger: orm.Mapped[int]
topics: orm.Mapped[dict[str, Any]]
value: orm.Mapped[str]

Usaremos una base de datos SQLite solo en memoria para esta guía, pero gracias al uso de un ORM, podríamos estar usando cualquier otra base de datos admitida. Simplemente tendríamos que cambiar la cadena de conexión.

from sqlalchemy import create_engine
engine = create_engine("sqlite://", echo=True)

# the following creates the table in the DB
Base.metadata.create_all(engine)
consejo

Al establecer echo=True, podemos entender lo que está sucediendo en la base de datos. La creación de la tabla de la base de datos genera los siguientes registros:

BEGIN (implicit)
PRAGMA main.table_info("SorobanEvent")
...
PRAGMA temp.table_info("SorobanEvent")
...
CREATE TABLE SorobanEvent (
id INTEGER NOT NULL,
contract_id VARCHAR NOT NULL,
ledger INTEGER NOT NULL,
topics JSON NOT NULL,
value VARCHAR NOT NULL,
PRIMARY KEY (id)
)
...
COMMIT
información

Usar un modelo de base de datos es muy conveniente ya que nos permite controlar el esquema de la base de datos programáticamente. Si necesitamos cambiar el esquema, por ejemplo, agregando nuevas columnas, entonces usar un ORM nos permite utilizar herramientas de migración muy poderosas.

Usaremos este modelo para crear y consultar los eventos almacenados en nuestra base de datos.

Consultar eventos desde Stellar RPC

Primero, necesitaremos consultar los eventos desde Stellar RPC. Este ejemplo simple realiza una solicitud RPC usando el método getEvents, filtrando todos los eventos de transfer que son emitidos por el contrato nativo XLM.

nota

Estamos haciendo algunas suposiciones aquí. Supondremos que tu contrato tiene suficiente actividad, y que estás consultando eventos con suficiente frecuencia para no estar en peligro de necesitar averiguar cuál es el ledger más antiguo que Stellar RPC conoce. El enfoque que estamos tomando es encontrar el número de secuencia del ledger más grande (más reciente) en la base de datos y consultar eventos comenzando desde allí. Tu caso de uso puede requerir algo de lógica para determinar cuál es el ledger más reciente y cuál es el ledger más antiguo disponible, etc.

Si comenzamos desde cero, no hay ledger conocido, así que podemos intentar ingerir aproximadamente los últimos 7 días asumiendo que un ledger se cierra cada 6s.

import stellar_sdk

soroban_server = stellar_sdk.SorobanServer()
ledger = soroban_server.get_latest_ledger().sequence-int(3600 / 6 * 24 * 7)

Más adelante, podremos empezar desde el ledger más recientemente ingerido haciendo una consulta a nuestra base de datos.

with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event.ledger).where(
Event.contract_id == contract_id
).order_by(Event.ledger.desc())
ledger = session.scalars(stmt).first()

¡Obtengamos eventos de Stellar RPC!

from stellar_sdk.soroban_rpc import EventFilter, EventFilterType

contract_id = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"

res = soroban_server.get_events(
ledger,
filters=[
EventFilter(
event_type=EventFilterType.CONTRACT,
contract_ids=[contract_id],
topics=[["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]],
)
],
)
events = res.events

Almacenar eventos en la base de datos

Ahora, vamos a verificar si el objeto events contiene algún evento nuevo que debamos almacenar, y hacemos exactamente eso. Estamos almacenando los topics y valores del evento como cadenas codificadas en base64 aquí, pero podrías decodificar los topics y valores necesarios a los tipos de datos apropiados para tu caso de uso.

consejo

Facilita tu vida con un sessionmaker de SQLAlchemy al realizar transacciones (por ejemplo, agregar registros).

import sqlalchemy
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(engine)
with Session.begin() as session:
events_ = []
for event in events:
topic_ = event.topic
value = event.value
events_.append(Event(contract_id=contract_id, ledger=event.ledger, topics=topic_, value=value))
session.add_all(events_)
BEGIN (implicit)
INFO sqlalchemy.engine.Engine COMMIT
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3311, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3325, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
...
COMMIT

Ejecutar el Script con Cron

Una entrada cron es una excelente manera de automatizar este script para recopilar e ingerir eventos cada cierto tiempo. Puedes configurar este script para que se ejecute tan (in)frecuentemente como desees o necesites. Este ejemplo ejecutaría el script cada 24 horas a la 1:14 pm:

14 13 * * * python /absolute/path/to/script.py

Aquí hay otro ejemplo que ejecutaría el script cada 30 minutos:

30 * * * * python /absolute/path/to/script.py