Reputation: 4074
I have a long running task which as a result generates regular files and a main file that lists others.
Scheduler regenerates this files once in a day through cron.
The task flow is implemented using rx-java
.
The problem is if one request comes in and starts the task or the task gets ran by a scheduler and then while task is in progress some other request comes and don't wait for task to complete, instead firing another execution.
So the question is how to sync on task execution, so it will be done only once ?
This is sample code:
@Service
public class FileService {
@Autowired FileRepository fileRepository;
@Autowired List<Pipeline> pipelines;
public Observable<File> getMainFile() {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getMainFile());
else
return generate(() -> fileRepository.getMainFile());
}
public Observable<File> getFile(String fileName) {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getFile(fileName));
else
return generate(() -> fileRepository.getFile(fileName));
}
Observable<File> generate(Func0<File> whenGenerated) {
return Observable.from(pipelines)
// other business logic goes here
// after task execution finished just get needed file
.map(isAllPipelinesSuccessful -> {
return whenGenerated.call();
});
}
@Scheduled(cron = "0 0 4 * * ?")
void scheduleGeneration() {
generate(() -> fileRepository.getMainFile()).subscribe();
}
}
And it's called from controller, sample code below:
@RestController
public class FileController {
private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins
@Autowired FileService fileService;
@RequestMapping(value = "/mainfile", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableMainFile = fileService.getMainFile();
observableMainFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null));
});
return deferredResult;
}
@RequestMapping(value = "/files/{filename:.+}", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableFile = fileService.getFile(filename);
observableFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause());
HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR;
deferredResult.setErrorResult(ResponseEntity.status(status).body(null));
});
return deferredResult;
}
}
Upvotes: 0
Views: 355
Reputation: 4002
I have something like the following, but I think there are way better solutions to this. I am using RxJava2-RC5.
UPDATE::
interface FileRepository {
String getFile();
Boolean isMainFileExists();
}
private static Scheduler executorService = Schedulers.from(Executors.newFixedThreadPool(1));
@org.junit.Test
public void schedulerTest123() throws Exception {
FileRepository fRepo = mock(FileRepository.class);
when(fRepo.getFile()).thenReturn("");
when(fRepo.isMainFileExists()).thenReturn(false);
Thread t1 = new Thread(() -> {
getFile(fRepo, executorService).subscribe();
});
Thread t2 = new Thread(() -> {
getFile(fRepo, executorService).subscribe();
});
t1.start();
t2.start();
Thread.sleep(3_000);
when(fRepo.getFile()).thenReturn("DasFile");
when(fRepo.isMainFileExists()).thenReturn(true);
Thread t3 = new Thread(() -> {
getFile(fRepo, executorService).subscribe();
});
t3.start();
Thread.sleep(5_000);
}
private Observable<String> getFile(FileRepository fileRepo, Scheduler scheduler) {
return Observable.defer(() -> {
try {
if (fileRepo.isMainFileExists()) {
return Observable.fromCallable(fileRepo::getFile)
.subscribeOn(Schedulers.io())
.doOnNext(s -> printCurrentThread("Get File from Repo"));
} else {
return startLongProcess().doOnNext(s -> printCurrentThread("Push long processValue"));
}
} catch (Exception ex) {
return Observable.error(ex);
}
}).subscribeOn(scheduler).doOnSubscribe(disposable -> printCurrentThread("SUB"));
}
private Observable<String> startLongProcess() {
return Observable.fromCallable(() -> {
printCurrentThread("Doing LongProcess");
Thread.sleep(5_000);
return "leFile";
});
}
private void printCurrentThread(String additional) {
System.out.println(additional + "_" + Thread.currentThread());
}
Upvotes: 1