Configuration

Spring Boot

Revents provide Spring Boot autoconfiguration by its revents-spring-boot-autoconfigure module. In order to use R2dbc based EventStore and SnapshotRepository, and MongoDB based TokenStore, put the following configuration into your application.yml:

revents:
  scanPackage: com.my.package
  mongo:
    token-store:
      enabled: true
  r2dbc:
    snapshot-repository:
      enabled: true
    event-store:
      enabled: true

Revents will scan the given package for classes annotated with @Aggregate and @SerializedType.

All the beans provided by Revents can be overridden.

Bean overrides

Command message interceptors

commandMessageInterceptors bean is a list of MessageInterceptor<CommandMessage<?>, CommandResult> objects. If you would like to use javax.validation to validate command messages and also want to store events created by aggregate then define it in a @Configuration class:

@Bean("commandMessageInterceptors")
@ConditionalOnMissingBean(name = "commandMessageInterceptors")
List<MessageInterceptor<CommandMessage<?>, CommandResult>> commandMessageInterceptors(
    Validator validator,
    EventStore eventStore) {

    return List.of(
        new ValidationMessageInterceptor<>(validator),
        new EventStoringCommandInterceptor(eventStore));
}
Event message dispatch interceptors

You might also want to execute all MongoDB statements inside a transaction, including the token storing ones triggered by the event acknowledge. This can be done with the following:

@Bean("eventMessageDispatchInterceptors")
Map<EventProcessor.EventProcessorId, List<MessageInterceptor<EventMessage<?>, Void>>> eventMessageDispatchInterceptors(
    MongoClient client) {

    return Map.of(EventProcessor.EventProcessorId.DEFAULT, List.of(
        new MongoTransactionalMessageInterceptor<>(client),
        new EventMessageAcknowledgeInterceptor()));
}

Note, that this wraps the whole event dispatch process into 1 MongoDB transaction, regardless of the number of event handlers executed.

Initializing the SQL schema
@Bean
ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {
    ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
    initializer.setConnectionFactory(connectionFactory);
    CompositeDatabasePopulator populator = new CompositeDatabasePopulator();
    populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("revents-db-schema.sql")));
    initializer.setDatabasePopulator(populator);
    return initializer;
}

Java configuration

Using Spring Boot is not mandatory, configuring Revents is possible with Java configuration.

EventSourcingIt.java contains a simple, in-memory based setup:

Event store contains all the events:

var eventStore = new InMemoryEventStore(new InMemoryTokenStore());

Aggregate loader can load existing aggregates from a persistent storage:

var aggregateLoader = EventSourcedBasedAggregateLoader.create(config -> config.eventStore(eventStore));

Commands need to be dispatched to the appropriate command handlers located in aggregates:

var commandMessageDispatcher = AnnotationAwareCommandMessageDispatcher.create(config -> config
    .aggregateLoader(aggregateLoader)
    .scanAggregateClassesIn("com.my.package"));

CommandBus delivers the CommandMessage:

var commandBus = DefaultCommandBus.create(busConfig -> busConfig
    .addCommandMessageInterceptors(
        new LoggingMessageInterceptor<>(),
        new EventStoringCommandInterceptor(eventStore))
    .commandMessageHandler(commandMessageDispatcher));

CommandGateway wraps the CommandBus, allows clients to send command objects:

var commandGateway = LocalCommandGateway.create(config -> config
    .commandAggregateIdResolver(commandMessageDispatcher)
    .commandBus(commandBus));

Event handler object - containing @EventHandler methods - and event processor configuration:

var eventHandler = new EventHandler();

var eventProcessor = TrackingEventProcessor.create(config -> config
    .eventStore(eventStore)
    .addEventMessageDispatchInterceptors(
        new MongoTransactionalMessageInterceptor<>(client),
        new EventMessageAcknowledgeInterceptor())
    .annotationBasedEventMessageHandler(messageHandlerConfig -> messageHandlerConfig
        .addEventMessageHandlerInterceptors((message, chain) -> Mono.just(1)
            .transform(monoLogOnSubscribe(x -> LOG.info("Event handler interceptor - before")))
            .flatMap(x -> chain.handle(message))
            .transform(monoLogOnSuccess(x -> LOG.info("Event handler interceptor - after"))))
        .addEventHandlers(eventHandler)));

eventProcessor.run();