Skip to main content

Consume previously ingested events

Once events have been ingested into a database, for instance as done in the ingest guide, they can be consumed without having the need to query again Stellar RPC. In the following, we will show how we can consume these events.

Let's get started!

First, get some events in a DB

Continuing right where we left in the ingest guide, we will use the ORM models to add a few more events.

from sqlalchemy import create_engine
engine = create_engine("sqlite://", echo=True)

Remember that events published by Soroban are XDR encoded. We can use stellar-sdk to convert back and forth between values and XDR representation.

In the following, we will use a topic called transfer and we will need some values and addresses. We can generate some test data:

import stellar_sdk

stellar_sdk.scval.to_symbol("transfer").to_xdr()
# 'AAAADwAAAAh0cmFuc2Zlcg=='
stellar_sdk.scval.to_int32(10_000).to_xdr()
# 'AAAABAAAJxA='
stellar_sdk.scval.to_int32(5_000).to_xdr()
# 'AAAABAAAE4g='
stellar_sdk.scval.to_int32(1_000).to_xdr()
# 'AAAABAAAA+g='
stellar_sdk.scval.to_address("GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH").to_xdr()
# 'AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc='
stellar_sdk.scval.to_address("GAFYGBHKVFP36EOIRGG74V42F3ORAA2ZWBXNULMNDXAMMXQH5MCIGXXI").to_xdr()
# 'AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM='

Now we can make some events using our ORM and send them to the database:

from sqlalchemy.orm import sessionmaker

contract_id = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"
Session = sessionmaker(engine)
with Session.begin() as session:
event_1 = Event(
ledger=1,
contract_id=contract_id,
topics={
# transfer
"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==",
# GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH
"topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="
},
value="AAAABAAAJxA="
)
event_2 = Event(
ledger=1,
contract_id=contract_id,
topics={
# transfer
"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==",
# GAFYGBHKVFP36EOIRGG74V42F3ORAA2ZWBXNULMNDXAMMXQH5MCIGXXI
"topic_2": "AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM="
},
value="AAAABAAAE4g="
)
session.add_all([event_1, event_2])
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 1, '{"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==", "topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="}', 'AAAABAAAJxA=')
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 1, '{"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==", "topic_2": "AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM="}', 'AAAABAAAE4g=')
INFO sqlalchemy.engine.Engine COMMIT
info

Here, we are storing XDR encoded values. We could have instead decided to store decoded values in the database. XDR being a compressed format, choosing when to decode the value is a tradeoff between CPU usage and memory consumption.

Consuming events

Using the same model we used to ingest events into the database, we can query the database to iterate over all events present in the table.

import sqlalchemy
from sqlalchemy import orm

with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event)
for event in session.scalars(stmt):
print(event.topics, event.value)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine SELECT "SorobanEvent".id, "SorobanEvent".contract_id, "SorobanEvent".ledger, "SorobanEvent".topics, "SorobanEvent".value
FROM "SorobanEvent"
INFO sqlalchemy.engine.Engine [...] ()
...
['AAAADwAAAAh0cmFuc2Zlcg==', 'AAAAEgAAAAAAAAAAbskHLxwXdlUVH3X3pVMFqpLHYpwmDD/PoaqYnQkX7J4=', 'AAAAEgAAAAGqo5kAOYww4Z8QWIx9TqXkXUvjFUg8mpEbsg03vV+8/w==', 'AAAADgAAAAZuYXRpdmUAAA=='] AAAACgAAAAAAAAAAAAAAC6Q7dAA=
['AAAADwAAAAh0cmFuc2Zlcg==', 'AAAAEgAAAAAAAAAAL6/diRR4by9YIZCM/+O0/BGYKWlSn2CvTEiHBptJs+k=', 'AAAAEgAAAAGqo5kAOYww4Z8QWIx9TqXkXUvjFUg8mpEbsg03vV+8/w==', 'AAAADgAAAAZuYXRpdmUAAA=='] AAAACgAAAAAAAAAAAAAAAAvrwgA=
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc='} AAAABAAAJxA=
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM='} AAAABAAAE4g=
INFO sqlalchemy.engine.Engine ROLLBACK
note

Notice previous events being present and having a slightly different formatting. While we are using a schema, it is still easy to corrupt a database. This is only shown for demonstration purposes.

SQLAlchemy allows to make advanced queries. For example, we could filter a single event based on some specific fields.

with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event).where(Event.ledger == 1)
for event in session.scalars(stmt):
print(event.topics, event.value)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine SELECT "SorobanEvent".id, "SorobanEvent".contract_id, "SorobanEvent".ledger, "SorobanEvent".topics, "SorobanEvent".value
FROM "SorobanEvent"
WHERE "SorobanEvent".ledger = ?
INFO sqlalchemy.engine.Engine [...] (1,)
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc='} AAAABAAAJxA=
{'topic_1': 'AAAADwAAAAh0cmFuc2Zlcg==', 'topic_2': 'AAAAEgAAAAAAAAAAC4ME6qlfvxHIiY3+V5ou3RADWbBu2i2NHcDGXgfrBIM='} AAAABAAAE4g=
INFO sqlalchemy.engine.Engine ROLLBACK

Streaming events

Depending on our application, we might want to consume events periodically by calling the database to see if there is anything new. Or fetch data as needed by our application. There is another possibility: event listeners!

While we are at it, we can make the results more readable or usable in Python by using the conversion helper provided by stellar-sdk.

@sqlalchemy.event.listens_for(Event, "after_insert")
def event_handler(mapper, connection, target):
topics = target.topics
value = stellar_sdk.scval.to_native(target.value)

for key, topic in topics.items():
topics[key] = stellar_sdk.scval.to_native(topic)

print(f"Event listener: {topics} {value}")

Next time a record gets inserted into the database, this event handler will be called. Let's try this:

with Session.begin() as session:
event_3 = Event(
ledger=2,
contract_id=contract_id,
topics={
# transfer
"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==",
# GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH
"topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="
},
value="AAAABAAAJxA="
)
session.add(event_3)
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?)
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 2, '{"topic_1": "AAAADwAAAAh0cmFuc2Zlcg==", "topic_2": "AAAAEgAAAAAAAAAAP4aG3RBmnO85da+8tCofnE+D6D0hWzMe1bWQm1I2auc="}', 'AAAABAAAJxA=')
Event listener: {'topic_1': 'transfer', 'topic_2': <Address [type=ACCOUNT, address=GA7YNBW5CBTJZ3ZZOWX3ZNBKD6OE7A7IHUQVWMY62W2ZBG2SGZVOOPVH]>} 10000
INFO sqlalchemy.engine.Engine COMMIT

Congratulations, you are ready to consume events from Stellar RPC!

Going further

Using the techniques we just presented would probably be enough for a lot of use cases. Still, for the readers wanting to go further there are a few things to look into.

Asynchronous programming

So far, we have used SQLAlchemy in a synchronous way. If we were to have an endpoint in the backend calling the database, this endpoint would block during the database call. SQLAlchemy supports asynchronous programming with async and await keywords.

As a general thought, it's simpler to start with a synchronous logic and then move on to adding support for async when everything works as expected. Debugging concurrent application brings an additional layer of complexity.

SQLAlchemy allows you to simply change from a synchronous session to an asynchronous one without having to change your models nor queries making it a very easy task to use one or the other.

Idempotency considerations

Depending on your application, you might want to look into the concept of idempotency. Or simply put: guarantee that an event is consumed only once.

For instance, if you use events for bookkeeping purposes on a payment application, processing twice the same event could result in a double spend. In such cases, you will want your system to be idempotent to guarantee that this scenario would be covered.

There is a large body of technical literature around this topic and there is no one solution fits all. It might be enough for your application to add a column in the database to mark a message as processed, though you would need to account for network issues happening while you process a given event. Using SQLAlchemy with a database like PostgreSQL could help as if done properly operations can be made atomic. i.e. you can ensure that a certain chain of actions has been performed before committing a transaction to the database.

While researching this topic, you could search for message brokers like RabbitMQ of Kafka---to cite widely used solutions.