PostgresConsumerStateStore
The PostgresConsumerStateStore class within the Evento Framework extends ConsumerStateStore and provides a robust solution for storing and managing consumer state information in a PostgreSQL database. This chapter explores the functionalities and usage of PostgresConsumerStateStore for persistent state handling within Evento applications.
Persistent Storage:
Leverages PostgreSQL, a relational database management system, to store consumer state data.
This enables data persistence across application restarts, ensuring that saga state and consumer progress are preserved.
Ideal for production environments where data loss prevention is critical.
Key Methods:
Constructors:
The primary constructor accepts an
EventoServerinstance, aPerformanceServiceinstance, aConnectionobject for connecting to the PostgreSQL database, anObjectMapperfor serialization/deserialization, and anExecutorfor handling observer execution concurrently.
init(): This method is crucial and should be called once before using the consumer state store. It executes Data Definition Language (DDL) statements to create the necessary tables (evento__consumer_stateandevento__saga_state) within the PostgreSQL database if they don't already exist.removeSagaState(Long sagaId): Deletes the state associated with a specific saga identified by its ID.leaveExclusiveZone(String consumerId): Releases the advisory lock on the consumer, signifying it has exited the exclusive zone.enterExclusiveZone(String consumerId): Attempts to acquire an advisory lock for the consumer using PostgreSQL'spg_advisory_lockfunction. This ensures exclusive access to the consumer's state during processing.getLastEventSequenceNumber(String consumerId): Retrieves the last processed event sequence number for a consumer from theevento__consumer_statetable.setLastEventSequenceNumber(String consumerId, Long eventSequenceNumber): Updates the last processed event sequence number for a consumer in theevento__consumer_statetable.getSagaState(String sagaName, String associationProperty, String associationValue): Retrieves the stored state for a saga identified by its name, association property, and association value. It utilizes a JSON path query within the PostgreSQL database to efficiently locate the relevant saga state.setSagaState(Long id, String sagaName, SagaState sagaState): Sets the state for a saga. If an ID is provided (indicating an existing saga), it updates the state for that specific saga. Otherwise, it inserts a new entry for the saga into theevento__saga_statetable.
Usage Example:
The provided code snippet demonstrates how to configure PostgresConsumerStateStore within an Evento application using a lambda expression within the setConsumerStateStoreBuilder method:
EventoBundle.Builder.builder()
.setBasePackage(DemoSagaApplication.class.getPackage())
.setConsumerStateStoreBuilder(((eventoServer, performanceService) -> {
return new PostgresConsumerStateStore(
eventoServer,
performanceService,
connection); // Connection Object
}))
...
.start();In this example, a Connection object (representing the connection to the PostgreSQL database) is provided during the consumer state store creation. This approach offers flexibility in managing database connections.
Important Considerations:
PostgresConsumerStateStorerelies on a PostgreSQL database for storage. Ensure you have a properly configured PostgreSQL instance accessible to your Evento application.Remember to call the
init()method to create the necessary tables before using the consumer state store.PostgreSQL offers strong consistency guarantees for data persistence.
By utilizing PostgresConsumerStateStore, you can ensure that your Evento application maintains a persistent record of consumer state and saga data across restarts. This is essential for production environments where data integrity and reliability are paramount.
Last updated