Saltar al contenido principal

Consumir eventos previamente ingeridos

Una vez que los eventos han sido ingeridos en una base de datos, por ejemplo, como se hace en la guía de ingestión, pueden ser consumidos sin que sea necesario consultar nuevamente el RPC de Stellar. A continuación, mostraremos cómo podemos consumir estos eventos.

¡Vamos a comenzar!

Primero, obtén algunos eventos en una base de datos

Siguiendo justo donde lo dejamos en la guía de ingestión, utilizaremos los modelos ORM para añadir varias más eventos.

from sqlalchemy import create_engine
engine = create_engine("sqlite://", echo=True)

Recuerda que los eventos publicados por Soroban están codificados en XDR. Podemos utilizar stellar-sdk para convertir entre valores y la representación XDR.

A continuación, usaremos un tema llamado transfer y necesitaremos algunos valores y direcciones. Podemos generar algunos datos de prueba:

import stellar_sdk

stellar_sdk.scval.to_symbol("transfer").to_xdr()
# 'AAAADwAAAAh0cmFuc2Zlcg=='
stellar_sdk.scval.to_int32(10_000).to_xdr()
# 'AAAABAAAJxA='
stellar_sdk.scval.to_int32(5_000).to_xdr()
# 'AAAABAAAE4g='
stellar_sdk.scval.to_int32(1_000).to_xdr()
# 'AAAABAAAA+g='
stellar_sdk.scval.to_address("GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH").to_xdr()
# 'AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc='
stellar_sdk.scval.to_address("GAFYGBHKVFP36EOIRGG74V42F3ORAA2ZWBXNULMNDXAMMXQH5MCIGXXI").to_xdr()
# 'AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM='

Ahora podemos crear algunos eventos usando nuestro ORM y enviarlos a la base de datos:

from sqlalchemy.orm import sessionmaker

contract_id = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"
Session = sessionmaker(engine)
with Session.begin() as session:
event_1 = Event(
ledger=1,
contract_id=contract_id,
topics={
# transfer
"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==",
# GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH
"topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="
},
value="AAAABAAAJxA="
)
event_2 = Event(
ledger=1,
contract_id=contract_id,
topics={
# transfer
"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==",
# GAFYGBHKVFP36EOIRGG74V42F3ORAA2ZWBXNULMNDXAMMXQH5MCIGXXI
"topic_2": "AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM="
},
value="AAAABAAAE4g="
)
session.add_all([event_1, event_2])
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 1, '{"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==", "topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="}', 'AAAABAAAJxA=')
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 1, '{"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==", "topic_2": "AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM="}', 'AAAABAAAE4g=')
INFO sqlalchemy.engine.Engine COMMIT
información

Aquí, estamos almacenando valores codificados en XDR. En su lugar, podríamos haber decidido almacenar valores decodificados en la base de datos. Dado que XDR es un formato comprimido, elegir cuándo decodificar el valor es un compromiso entre el uso de CPU y el consumo de memoria.

Consumiendo eventos

Utilizando el mismo modelo que usamos para ingerir eventos en la base de datos, podemos consultar la base de datos para iterar sobre todos los eventos presentes en la tabla.

import sqlalchemy
from sqlalchemy import orm

with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event)
for event in session.scalars(stmt):
print(event.topics, event.value)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine SELECT "SorobanEvent".id, "SorobanEvent".contract_id, "SorobanEvent".ledger, "SorobanEvent".topics, "SorobanEvent".value
FROM "SorobanEvent"
INFO sqlalchemy.engine.Engine [...] ()
...
['AAAADwAAAAh0cmFuc2Zlcg==', 'AAAAEgAAAAAAAAAAbskHLxwXdlUVH3X3pVMFqpLHYpwmDD/PoaqYnQkX7J4=', 'AAAAEgAAAAGqo5kAOYww4Z8QWIx9TqXkXUvjFUg8mpEbsg03vV+8/w==', 'AAAADgAAAAZuYXRpdmUAAA=='] AAAACgAAAAAAAAAAAAAAC6Q7dAA=
['AAAADwAAAAh0cmFuc2Zlcg==', 'AAAAEgAAAAAAAAAAL6/diRR4by9YIZCM/+O0/BGYKWlSn2CvTEiHBptJs+k=', 'AAAAEgAAAAGqo5kAOYww4Z8QWIx9TqXkXUvjFUg8mpEbsg03vV+8/w==', 'AAAADgAAAAZuYXRpdmUAAA=='] AAAACgAAAAAAAAAAAAAAAAvrwgA=
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc='} AAAABAAAJxA=
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM='} AAAABAAAE4g=
INFO sqlalchemy.engine.Engine ROLLBACK
nota

Observa cómo los eventos anteriores están presentes y tienen un formato ligeramente diferente. Aunque estamos utilizando un esquema, todavía es fácil corromper una base de datos. Esto solo se muestra con fines de demostración.

SQLAlchemy permite realizar consultas avanzadas. Por ejemplo, podríamos filtrar un solo evento basado en algunos campos específicos.

with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event).where(Event.ledger == 1)
for event in session.scalars(stmt):
print(event.topics, event.value)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine SELECT "SorobanEvent".id, "SorobanEvent".contract_id, "SorobanEvent".ledger, "SorobanEvent".topics, "SorobanEvent".value
FROM "SorobanEvent"
WHERE "SorobanEvent".ledger = ?
INFO sqlalchemy.engine.Engine [...] (1,)
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc='} AAAABAAAJxA=
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM='} AAAABAAAE4g=
INFO sqlalchemy.engine.Engine ROLLBACK

Transmitiendo eventos

Dependiendo de nuestra aplicación, podríamos querer consumir eventos periódicamente llamando a la base de datos para ver si hay algo nuevo. O obtener datos según lo necesite nuestra aplicación. Hay otra posibilidad: ¡oyentes de eventos!

Mientras tanto, podemos hacer que los resultados sean más legibles o utilizables en Python utilizando el asistente de conversión proporcionado por stellar-sdk.

@sqlalchemy.event.listens_for(Event, "after_insert")
def event_handler(mapper, connection, target):
topics = target.topics
value = stellar_sdk.scval.to_native(target.value)

for key, topic in topics.items():
topics[key] = stellar_sdk.scval.to_native(topic)

print(f"Event listener: {topics} {value}")

La próxima vez que se inserte un registro en la base de datos, se llamará a este handler de eventos. Intenta esto:

with Session.begin() as session:
event_3 = Event(
ledger=2,
contract_id=contract_id,
topics={
# transfer
"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==",
# GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH
"topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="
},
value="AAAABAAAJxA="
)
session.add(event_3)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?)
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 2, '{"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==", "topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="}', 'AAAABAAAJxA=')
Event listener: {'topic_1': 'transfer', 'topic_2': <Address [type=ACCOUNT, address=GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH]>} 10000
INFO sqlalchemy.engine.Engine COMMIT

¡Felicidades, estás listo para consumir eventos de Stellar RPC!

Avanzando

Utilizar las técnicas que acabamos de presentar probablemente sería suficiente para muchos casos de uso. Sin embargo, para los lectores que quieren avanzar, hay algunas cosas en las que mirar.

Programación asincrónica

Hasta ahora, hemos utilizado SQLAlchemy de manera sincrónica. Si tuviéramos un endpoint en el backend llamando a la base de datos, este endpoint se bloquearía durante la llamada a la base de datos. SQLAlchemy admite la programación asincrónica con las palabras clave async y await.

Como reflexión general, es más simple comenzar con una lógica sincrónica y luego pasar a añadir soporte para asincrónica cuando todo funcione como se espera. Depurar aplicaciones concurrentes trae una capa adicional de complejidad.

SQLAlchemy te permite cambiar simplemente de una sesión sincrónica a una asincrónica sin necesidad de cambiar tus modelos ni consultas, lo que hace que sea una tarea muy fácil usar uno u otro.

Consideraciones de idempotencia

Dependiendo de tu aplicación, podrías querer investigar el concepto de idempotencia. O dicho de otra manera: garantizar que un evento sea consumido solo una vez.

Por ejemplo, si usas eventos para fines contables en una aplicación de pagos, procesar dos veces el mismo evento podría resultar en un doble gasto. En tales casos, querrás que tu sistema sea idempotente para garantizar que este escenario esté cubierto.

Hay una gran cantidad de literatura técnica sobre este tema y no hay una solución única que funcione para todos. Podría ser suficiente que tu aplicación agregue una columna en la base de datos para marcar un mensaje como procesado, aunque necesitarías tener en cuenta los problemas de red que pueden ocurrir mientras procesas un evento determinado. Usar SQLAlchemy con una base de datos como PostgreSQL podría ayudar, ya que si se hace correctamente, las operaciones pueden hacerse atómicas. Es decir, puedes asegurarte de que una cierta cadena de acciones ha sido realizada antes de comprometer una transacción a la base de datos.

Mientras investigas este tema, podrías buscar brokers de mensajes como RabbitMQ o Kafka, para citar soluciones ampliamente utilizadas.