user3458271
user3458271

Reputation: 660

Hazelcast Jet Job is not able to access spring context

I am createing a spring boot application and in my application I am trying to use Hazelcast Jet in normal spring application it is working properly but while trying in spring boot application it is not able to access the beans. Beans are getting created dynamically so I have access them through AppBeans class.

For dependency I have take this 3:

<dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-all</artifactId>
        <version>4.2</version>
    </dependency>
    <dependency>
        <groupId>com.hazelcast.jet</groupId>
        <artifactId>hazelcast-jet</artifactId>
        <version>4.5</version>
    </dependency>
    <dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-spring</artifactId>
        <version>4.2</version>
    </dependency>

For create Hazelcast Instance I am using below code:

@SpringAware
public class DataFactory implements Serializable {
   SpringManagedContext springManagedContext = (SpringManagedContext) AppBeans.getBean("springManagedContext");

public JetInstance buildJetInstance() {
    if (!EtlObjects.jetStart) {
        
    EtlObjects.jetStart = true;
    JetConfig jetConfig = new JetConfig();
    jetConfig.getHazelcastConfig().setProperty( "hazelcast.logging.type", "log4j" );
    jetConfig.getInstanceConfig().setCooperativeThreadCount(5);
    jetConfig.configureHazelcast(c -> {
        c.getNetworkConfig().setReuseAddress(true);
        c.setClusterName(UUID.randomUUID().toString());
        c.setManagedContext(springManagedContext);
        c.getNetworkConfig().setPort(9493);
        c.getNetworkConfig().setPublicAddress("localhost");
        c.getNetworkConfig().setPortAutoIncrement(true);
        c.getNetworkConfig().getJoin().getAutoDetectionConfig().setEnabled(false);
        c.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
        c.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).setMembers(Arrays.asList(new String[] {"localhost"}));
        
    });
    EtlObjects.jetInstance = Jet.newJetInstance(jetConfig);

    }
    return EtlObjects.jetInstance;
 }
}

And in job when I am trying to access the beans it is giving me null:

@Component
public class JDBCDataSource implements ISourceBatch, Serializable {
        @Override
         public BatchStage<Object> readSource(Pipeline pipeline) {
                 BatchSource<Object> jdbcSource = Sources
                .jdbc(() -> {

                 **> //Here it giving null cannot access datasource Bean**

                    Connection conn = ((DataSource)AppBeans.getBean(thissource.get("datasourceName").toString())).getConnection();
                    return conn;
                },
                    (con, parallelism, index) -> {
                     ..........my code
                   }});
         }
}

And AppBeans class is as follow:

@Component("s")
public class AppBeans implements ApplicationContextAware, 
ServletContextListener,BeanDefinitionRegistryPostProcessor,Serializable{
private static ApplicationContext CONTEXT;
private static ServletContext SERVLETCONTEXT;
private static BeanDefinitionRegistry REGISTRY;
private transient AutowireCapableBeanFactory beanFactory;


public void setApplicationContext(ApplicationContext context) throws BeansException {
   
    CONTEXT = context;
    beanFactory = context.getAutowireCapableBeanFactory();
 }

 public static Object getBean(String beanName) {
    return CONTEXT.getBean(beanName);
  }

}

Also for starting application:

@SpringBootApplication(exclude = {MongoAutoConfiguration.class, MongoDataAutoConfiguration.class, FreeMarkerAutoConfiguration.class})
@ComponentScan(basePackages =  {"com"})
@EnableAutoConfiguration
public class AppApplication extends SpringBootServletInitializer {

    public static void main(String[] args) {
    SpringApplication.run(AppApplication.class, args);
   }
   @Bean
   public SpringManagedContext springManagedContext() {
     return new SpringManagedContext();
   }
}

Please let me know how can I Jet Sources give access to spring context. Let me know if anything is not clear

Upvotes: 0

Views: 440

Answers (1)

Neil Stevenson
Neil Stevenson

Reputation: 3150

You need to pass the Spring Managed Context to the job source, which can then initialize with Spring's Application Context.

Try

    static Pipeline buildPipeline() {
        return Pipeline.create()
                .readFrom(mySource())
                .writeTo(Sinks.logger())
                .getPipeline();
    }
    
    static BatchSource<String> mySource() {
        return SourceBuilder.batch("72743077", 
                        jobContext -> new MyBatchSource(jobContext.hazelcastInstance().getConfig().getManagedContext()))
                    .fillBufferFn(MyBatchSource::fillBufferFn)
                    .build();
    }

    @SpringAware
    static class MyBatchSource implements ApplicationContextAware {
        private ApplicationContext applicationContext;
        
        MyBatchSource(ManagedContext managedContext) {
            if (managedContext instanceof SpringManagedContext) {
                SpringManagedContext springManagedContext = (SpringManagedContext) managedContext;
                springManagedContext.initialize(this);
            }
        }
        
        void fillBufferFn(SourceBuilder.SourceBuffer<String> buffer) {
            Config config = this.applicationContext.getBean(Config.class);
            buffer.add("HELLO FROM " + config.getClusterName());
            buffer.close();
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

To make it work, ensure the Spring Managed Context is plugged into the Hazelcast configuration before creating the server instance.

    @Bean
    public SpringManagedContext managedContext() {
        return new SpringManagedContext();
    }
    
    @Bean
    public Config config(ManagedContext managedContext) {
        Config config = new Config();
        config.setManagedContext(managedContext);

Also, if you advance to 5.1.2 or whatever is the latest, you only need com.hazelcast:hazelcast-spring:5.1.2 as a dependency.

Upvotes: 0

Related Questions