Skip to main content

Ingest events published from a contract

Soroban RPC provides a getEvents method which allows you to query events from a smart contract. However, the data retention window for these events is roughly 24 hours. If you need access to a longer-lived record of these events you'll want to "ingest" the events as they are published, maintaining your own record or database as events are ingested.

There are many strategies you can use to ingest and keep the events published by a smart contract. Among the simplest might be using a community-developed tool such as Mercury which will take all the infrastructure work off your plate for a low subscription fee.

Another approach we'll explore here is using a cron job to query Soroban RPC periodically and store the relevant events in a locally stored SQLite database. We are going to use an Object Relational Mapper (ORM), allowing us to write database query directly in Python or JavaScript.

Setup

In a virtual environment, install the Python dependencies:

pip install sqlalchemy stellar-sdk

Setup the Database Client

To access the database, we will use SQLAlchemy, which is a frequently used Python library to query database.

We are going to ingest events in a table named SorobanEvent. In SQLAlchemy, this translates into a class, also called a database model:

from typing import Any
from sqlalchemy import orm, JSON


class Base(orm.DeclarativeBase):
# needed to tell SQLAlchemy to translate a dictionary into a JSON entry
type_annotation_map = {
dict[str, Any]: JSON,
}


class Event(Base):
__tablename__ = "SorobanEvent"

id: orm.Mapped[int] = orm.mapped_column(primary_key=True)
contract_id: orm.Mapped[str]
ledger: orm.Mapped[int]
topics: orm.Mapped[dict[str, Any]]
value: orm.Mapped[str]

We will use an in-memory-only SQLite database for this guide, but thanks to the use of an ORM, we could be using any other supported database. We would simply need to change the connection string.

from sqlalchemy import create_engine
engine = create_engine("sqlite://", echo=True)

# the following creates the table in the DB
Base.metadata.create_all(engine)
tip

By setting echo=True we can understand what is happening on the database. Creating the database table leads to the following logs:

BEGIN (implicit)
PRAGMA main.table_info("SorobanEvent")
...
PRAGMA temp.table_info("SorobanEvent")
...
CREATE TABLE SorobanEvent (
id INTEGER NOT NULL,
contract_id VARCHAR NOT NULL,
ledger INTEGER NOT NULL,
topics JSON NOT NULL,
value VARCHAR NOT NULL,
PRIMARY KEY (id)
)
...
COMMIT
info

Using a database model is very convenient as it allows us to control the database schema programmatically. If we need to change the schema, by adding a new columns for instance, then using an ORM allows us to use very powerful migration tools.

We'll use this model to create and query for the events stored in our database.

Query Events from Soroban RPC

First, we'll need to query the events from Soroban RPC. This simple example makes an RPC request using the getEvents method, filtering for all transfer events that are emitted by the native XLM contract.

note

We are making some assumptions here. We'll assume that your contract sees enough activity, and that you are querying for events frequently enough that you aren't in danger of needing to figure out the oldest ledger Soroban RPC is aware of. The approach we're taking is to find the largest (most recent) ledger sequence number in the database and query for events starting there. Your use-case may require some logic to determine what the latest ledger is, and what the oldest ledger available is, etc.

If we start from scratch, there is no known ledger so we can try to ingest roughly the last 24 hours assuming a ledger closes every 6s.

import stellar_sdk

soroban_server = stellar_sdk.SorobanServer()
ledger = soroban_server.get_latest_ledger().sequence-int(3600 / 6 * 24)

Later on, we will be able to start from the latest ingested ledger by making a query to our DB.

with orm.Session(engine) as session:
stmt = sqlalchemy.select(Event.ledger).where(
Event.contract_id == contract_id
).order_by(Event.ledger.desc())
ledger = session.scalars(stmt).first()

Let's get events from Soroban RPC!

from stellar_sdk.soroban_rpc import EventFilter, EventFilterType

contract_id = "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC"

res = soroban_server.get_events(
ledger,
filters=[
EventFilter(
event_type=EventFilterType.CONTRACT,
contract_ids=[contract_id],
topics=[["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]],
)
],
)
events = res.events

Store Events in the Database

Now, we'll check if the events object contains any new events we should store, and we do exactly that. We're storing the event's topics and values as base64-encoded strings here, but you could decode the necessary topics and values into the appropriate data types for your use-case.

tip

Make your life easier with a SQLAlchemy sessionmaker when making transactions (e.g. add records.)

import sqlalchemy
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(engine)
with Session.begin() as session:
events_ = []
for event in events:
topic_ = event.topic
value = event.value
events_.append(Event(contract_id=contract_id, ledger=event.ledger, topics=topic_, value=value))
session.add_all(events_)
BEGIN (implicit)
INFO sqlalchemy.engine.Engine COMMIT
INFO sqlalchemy.engine.Engine BEGIN (implicit)
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3311, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
INFO sqlalchemy.engine.Engine INSERT INTO "SorobanEvent" (contract_id, ledger, topics, value) VALUES (?, ?, ?, ?) RETURNING id
INFO sqlalchemy.engine.Engine [...] ('CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC', 3325, '["AAAADwAAAAh0cmFuc2Zlcg==", "AAAAEgAAAAAAAAAAJY16rJOcKxQayCR7ayNA80hW5q1U4ypIGOY7NktBfKU=", "AAAAEgAAAAHXkotywnA8z+r365/0701QSlWouXn8m0UOoshCtNHOYQ==", "AAAADgAAAAZuYXRpdmUAAA=="]', 'AAAACgAAAAAAAAAAAAAAAAAAAGQ=')
...
COMMIT

Run the Script with Cron

A cron entry is an excellent way to automate this script to gather and ingest events every so often. You could configure this script to run as (in)frequently as you want or need. This example would run the script every 24 hours at 1:14 pm:

14 13 * * * python /absolute/path/to/script.py

Here's another example that will run the script every 30 minutes:

30 * * * * python /absolute/path/to/script.py