activiti-event-logger写入es

1085 查看

引入spring-data-elasticsearch

<!-- http://mvnrepository.com/artifact/org.springframework.data/spring-data-elasticsearch -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>

自定义event flusher

public class EsEventFlusher extends AbstractEventFlusher {

    private static final Logger logger = LoggerFactory.getLogger(EsEventFlusher.class);

    @Override
    public void closing(CommandContext commandContext) {
        for (EventLoggerEventHandler eventHandler : eventHandlers) {
            try {
                EventLogEntryEntity entryEntity = eventHandler.generateEventLogEntry(commandContext);
                EventLogEs es = EventLogEs.buildFrom(entryEntity);
                EventLogEsRepository repository = ApplicationContextHolder.getContext().getBean(EventLogEsRepository.class);
                repository.save(es);
                logger.info("###{}",es);
                repository.refresh();
            } catch (Exception e) {
                logger.warn("Could not create event log", e);
            }
        }
    }
}

自定义event logger(使其使用自定义的event flusher)

public class EsEventLogger extends EventLogger{

    public static final String EVENT_FLUSHER_KEY = "eventFlusher";

    public EsEventLogger(Clock clock, ObjectMapper objectMapper) {
        super(clock, objectMapper);
    }

    @Override
    public void onEvent(ActivitiEvent event) {
        EventLoggerEventHandler eventHandler = getEventHandler(event);
        if (eventHandler != null) {

            // Events are flushed when command context is closed
            CommandContext currentCommandContext = Context.getCommandContext();
            EventFlusher eventFlusher = (EventFlusher) currentCommandContext.getAttribute(EVENT_FLUSHER_KEY);

            if (eventHandler != null && eventFlusher == null) {

                eventFlusher = createEventFlusher();
                if (eventFlusher == null) {
                    eventFlusher = new EsEventFlusher(); // change to es event logger
                }
                currentCommandContext.addAttribute(EVENT_FLUSHER_KEY, eventFlusher);

                currentCommandContext.addCloseListener(eventFlusher);
                currentCommandContext
                        .addCloseListener(new CommandContextCloseListener() {

                            @Override
                            public void closing(CommandContext commandContext) {
                            }

                            @Override
                            public void closed(CommandContext commandContext) {
                                // For those who are interested: we can now broadcast the events were added
                                if (listeners != null) {
                                    for (EventLoggerListener listener : listeners) {
                                        listener.eventsAdded(EsEventLogger.this);
                                    }
                                }
                            }

                        });
            }

            eventFlusher.addEventHandler(eventHandler);
        }
    }
}

启动时指定该event logger

@Bean
    public CommandLineRunner init() {
        return new CommandLineRunner() {
            public void run(String... strings) throws Exception {
                //开启event logging
                runtimeService.addEventListener(new EsEventLogger(processEngineConfiguration.getClock(),objectMapper));
            }
        };
    }

es数据

http://192.168.99.100:9200/_plugin/head/

TODO

需要考虑回滚的情况,跟mybatis的事务绑定在一起。