user1556622
user1556622

Reputation:

How to configure Async and Sync Event publishers using spring

I am trying to implement an event framework using spring events.I came to know that the default behavior of spring event framework is sync. But during spring context initialization if it finds a bean with id applicationEventMulticaster it behaves Async.

Now i want to have both sync and async event publishers in my application, because some of the events needs to be published sync. I tried to configure sync event multicaster using SysncTaskExecutor, but i cant find a way to inject it into my AsyncEventPublisher's applicationEventPublisher property. My spring configuration file is as below

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="shutdown">
        <property name="corePoolSize" value="5" />
        <property name="maxPoolSize" value="10" />
        <property name="WaitForTasksToCompleteOnShutdown" value="true" />
    </bean>  

    <bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />

    <bean id="customEventPublisher" class="x.spring.event.CustomEventPublisher" />
    <bean id="customEventHandler" class="x.spring.event.CustomEventHandler" />  
    <bean id="eventSource" class="x.spring.event.EventSource" /> 
    <bean id="responseHandler" class="x.spring.event.ResponseHandler" /> 
    <bean id="syncEventSource" class="x.spring.event.syncEventSource" /> 


    <bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
        <property name="taskExecutor" ref="taskExecutor" />         
    </bean>    

    <bean id="syncApplicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
        <property name="taskExecutor" ref="syncTaskExecutor" />         
    </bean>    

Can anyone help me out here ?

Upvotes: 13

Views: 16879

Answers (3)

Michael
Michael

Reputation: 91

I just had to work this out for myself. By default events are sent asynchronously except if you implement a marker interface, in my case I called it SynchronousEvent. You'll need an 'executor' in your config too (I omitted mine as it's quite customised).

@EnableAsync
@SpringBootConfiguration
public class BigFishConfig {

    @Autowired AsyncTaskExecutor executor;

    @Bean
    public ApplicationEventMulticaster applicationEventMulticaster() {
        log.debug("creating multicaster");
        return new SimpleApplicationEventMulticaster() {
            @Override
            public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
                ResolvableType type = eventType != null ? eventType : ResolvableType.forInstance(event);
                if (event instanceof PayloadApplicationEvent 
                        && ((PayloadApplicationEvent<?>) event).getPayload() instanceof SynchronousEvent) 
                    getApplicationListeners(event, type).forEach(l -> invokeListener(l, event));
                else 
                    getApplicationListeners(event, type).forEach(l -> executor.execute(() -> invokeListener(l, event)));
            }
        };
    }
...

Upvotes: 9

zg_spring
zg_spring

Reputation: 359

i am not good for edit with stackoverflow. please forgive me.

  1. SyncTaskExecutor

I don't need to add comment that you can know well. this is synchronized. this Executor run task in sequence, and blocked for every task.

 public class SyncTaskExecutor implements TaskExecutor, Serializable {

/**
 * Executes the given {@code task} synchronously, through direct
 * invocation of it's {@link Runnable#run() run()} method.
 * @throws IllegalArgumentException if the given {@code task} is {@code null}
 */
@Override
public void execute(Runnable task) {
    Assert.notNull(task, "Runnable must not be null");
    task.run();
}

}

  1. SimpleAsyncTaskExecutor

This class is very large, so i just choose section of code. If you give threadFactory, will be retrieved Thread from this factory, or will be create new Thread.

    protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
    thread.start();
}
  1. ThreadPoolTaskExecutor

this class use jdk5's current pkg ThreadPoolTaskExecutor. but spring encapsulate functionality. Spring is good at this way, jdk6's current and jdk7'scurrent pkg have some difference. this will be get Thread from ThreadPool and reuse it, execute every task Asynchronized. If you want to know more detail, see JKD source code.

Upvotes: 2

zg_spring
zg_spring

Reputation: 359

no, you can't do that, the spring initApplicationEventMulticaster just init only one, and the BeanName must be applicationEventMulticaster. so you just can choose one of below Executor:

- org.springframework.core.task.SyncTaskExecutor

- org.springframework.core.task.SimpleAsyncTaskExecutor

- your own Executor: org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

any way, you can modify org.springframework.context.event.SimpleApplicationEventMulticaster to add your logic, then you can control whether need to Sync/Async

    /**
 * Initialize the ApplicationEventMulticaster.
 * Uses SimpleApplicationEventMulticaster if none defined in the context.
 * @see org.springframework.context.event.SimpleApplicationEventMulticaster
 */
protected void initApplicationEventMulticaster() {
    ConfigurableListableBeanFactory beanFactory = getBeanFactory();
    if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
        this.applicationEventMulticaster =
                beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
        if (logger.isDebugEnabled()) {
            logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
        }
    }
    else {
        this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
        beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
        if (logger.isDebugEnabled()) {
            logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
                    APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
                    "': using default [" + this.applicationEventMulticaster + "]");
        }
    }
}

Upvotes: 5

Related Questions