Reputation: 457
I want to use write behind strategy of Apache Ignite with my Spring boot Application with mysql database. I want to write cache data to mysql database with write behind asynchronously. I am new with Apache Ignite and I am referring docs from https://apacheignite.readme.io/docs/.
Below is my Spring boot configuration for using write behind strategy of Apache Ignite.
@Bean
public Ignite igniteInstance() {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setIgniteInstanceName("ignite-1");
cfg.setPeerClassLoadingEnabled(true);
CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache");
ccfg2.setIndexedTypes(Long.class, Contact.class);
ccfg2.setWriteBehindEnabled(true);
ccfg2.setWriteBehindFlushFrequency(1000);
ccfg2.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
ccfg2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setBackups(1);
ccfg2.setWriteBehindFlushSize(0);
ccfg2.setWriteBehindFlushThreadCount(1);
ccfg2.setWriteBehindBatchSize(1);
//ccfg2.setReadThrough(true);
//ccfg2.setWriteThrough(true);
// Memory Configuration
DataStorageConfiguration storageCfg = new DataStorageConfiguration();
DataRegionConfiguration defaultDataRegionCfg = storageCfg.getDefaultDataRegionConfiguration();
defaultDataRegionCfg.setPersistenceEnabled(false); // Only Memory
defaultDataRegionCfg.setMaxSize(1024 * 1024 * 256); // 256MB
defaultDataRegionCfg.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(storageCfg);
CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>();
f2.setDataSource(datasource);
f2.setDialect(new MySQLDialect());
JdbcType jdbcContactType = new JdbcType();
jdbcContactType.setCacheName("ContactCache");
jdbcContactType.setKeyType(Long.class);
jdbcContactType.setValueType(Contact.class);
jdbcContactType.setDatabaseTable("contact");
jdbcContactType.setDatabaseSchema("demo");
jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"),
new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"),
new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
f2.setTypes(jdbcContactType);
ccfg2.setCacheStoreFactory(f2);
TcpCommunicationSpi spi = new TcpCommunicationSpi();
spi.setMessageQueueLimit(1000);
CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
ccfg.setIndexedTypes(Long.class, Person.class);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindFlushFrequency(2000);
ccfg.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC);
ccfg.setWriteBehindFlushSize(0);
ccfg.setWriteBehindFlushThreadCount(10);
ccfg.setWriteBehindBatchSize(10);
ccfg.setBackups(1);
ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
// ccfg.setDataRegionName(storageCfg);
// ccfg.setReadThrough(true);
// ccfg.setWriteThrough(true);
CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
f.setDataSource(datasource);
f.setDialect(new MySQLDialect());
JdbcType jdbcType = new JdbcType();
jdbcType.setCacheName("PersonCache");
jdbcType.setKeyType(Long.class);
jdbcType.setValueType(Person.class);
jdbcType.setDatabaseTable("person");
jdbcType.setDatabaseSchema("demo");
jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"),
new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"),
new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"),
new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"),
new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"),
new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"),
new JdbcTypeField(Types.VARCHAR, "birth_date", String.class, "birthDate"));
f.setTypes(jdbcType);
ccfg.setCacheStoreFactory(f);
Ignite ignite = Ignition.start(cfg);
ignite.cluster().active(true);
return ignite;
}
With this configuration, If I set write through to true then data will successfully inserted to database. But write behind is not working with this configuration. I have refer https://apacheignite.readme.io/v2.7/docs/3rd-party-store page for write-behind configuration but I think it is not working. Please let me know if I miss something as I am newbie with Ignite.
My Repostiary code -
@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {
List<Person> findByFirstNameAndLastName(String firstName, String lastName);
@Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
List<Contact> selectContacts(String firstName, String lastName);
@Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
List<List<?>> selectContacts2(String firstName, String lastName);
}
Model class -
@QueryGroupIndex.List(
@QueryGroupIndex(name="idx1")
)
public class Person implements Serializable {
private static final long serialVersionUID = -1271194616130404625L;
private static final AtomicLong ID_GEN = new AtomicLong();
@QuerySqlField(index = true)
private Long id;
@QuerySqlField(index = true)
@QuerySqlField.Group(name = "idx1", order = 0)
private String firstName;
@QuerySqlField(index = true)
@QuerySqlField.Group(name = "idx1", order = 1)
private String lastName;
private Gender gender;
private Date birthDate;
private String country;
private String city;
private String address;
@Transient
private List<Contact> contacts = new ArrayList<>();
public void init() {
this.id = ID_GEN.incrementAndGet();
}
Controller class -
@RestController
@RequestMapping("/person")
public class PersonController {
private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);
@Autowired
PersonRepository repository;
@PostMapping
public Person add(@RequestBody Person person) {
person.init();
return repository.save(person.getId(), person);
}
@PutMapping
public Person update(@RequestBody Person person) {
return repository.save(person.getId(), person);
}
@DeleteMapping("/{id}")
public void delete(Long id) {
repository.delete(id);
}
@GetMapping("/{id}")
public Person findById(@PathVariable("id") Long id) {
return repository.findOne(id);
}
If anyone want to access full code base then It is available on - https://github.com/gotidhavalh/ignite-rest-service
Upvotes: 1
Views: 764
Reputation: 19313
You need to enable both
ccfg.setWriteThrough(true);
and
ccfg.setWriteBehindEnabled(true);
for the Write Behind to work. Please also make sure to check its flush timeout (5s by default).
Upvotes: 2