Reputation: 3750
I'm trying to create an Observable from an array of items that each regularly check for server updates and then sends an action when it gets the result it wants for each item.
The answer below is helpful, however not quite what I'm looking for
This is the other approach I've been trying:
export function handleProcessingScenes(action$,store) {
return action$.ofType(REQUEST_ALL_SCENES_BY_LOCATION_FULFILLED)
.switchMap(({ scenesByLocation }) => Observable.from(scenesByLocation))
.filter(scene => scene.scenePanoTask)
.mergeMap(scene => updateScene(scene))
}
function updateScene(scene) {
return Observable.interval(3000)
.flatMap(() => requestSceneUpdates(scene.id))
.takeWhile(res => res.payload.status < 4)
.timeout(600000, Observable.throw(new Error('Timeout')))
}
The API function returns an Observable
export function requestSceneUpdates(sceneId){
console.log('requestSceneUpdate')
const request = fetch(`${API_URL}/scene/task/${sceneId}/update`, {
method: 'get',
credentials: 'include',
crossDomain: true,
}).then(res => res.json())
return Observable.fromPromise(request)
}
However this only calls the 'requestSceneUpdate' function once.
I basically want to call that function every 3 seconds for each scene in scenesByLocation. I then want to return an action when each one is finished.
The epic that I have for a single scene is
export function sceneProcessingUpdate(action$) {
return action$.ofType(REQUEST_SCENE_PROCESSING_TASK_SUCCESS)
.switchMap(({task}) =>
Observable.timer(0, 30000).takeUntil(action$.ofType( REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS))
.exhaustMap(() =>
requestSceneUpdates(task.id)
.map((res) => {
if (res.error)
return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: res.message }
else if(res.payload.status === 4)
return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS, task: res.payload }
else
return requestSceneProcessingTaskMessage(res.payload)
})
.catch(err => { return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: err } })
)
)
}
Upvotes: 0
Views: 94
Reputation: 3750
This worked in the end, @Andrew fixed the first part.
export function handleProcessingScenes(action$,store) {
return action$.ofType(REQUEST_ALL_SCENES_BY_LOCATION_FULFILLED)
.switchMap(({ scenesByLocation }) => Observable.from(scenesByLocation))
.filter(scene => scene.scenePanoTask)
.flatMap(scene => {
return Observable.timer(0, 5000).takeUntil(action$.ofType( REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS))
.exhaustMap(() =>
requestSceneUpdates(scene.id)
.map((res) => {
if (res.error)
return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: res.message }
else if(res.payload.status === 4)
return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS, task: res.payload }
else
return requestSceneProcessingTaskMessage(res.payload)
})
.catch(err => { return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_FAILED, message: err } })
)
})
}
Upvotes: 0
Reputation: 8295
I think you need something like this. The idea is to retry the scene update if it fails, after 3 seconds and not use a timer.
export function handleProcessingScenes(action$) {
return action$.ofType(REQUEST_ALL_SCENES_BY_LOCATION_FULFILLED)
.switchMap(({ scenesByLocation }) => Observable.from(scenesByLocation))
.filter(scene => scene.scenePanoTask)
.mergeMap(scene => updateScene(scene));
}
function updateScene(scene) {
return requestSceneUpdates(scene.id)
.map((res) => {
if (res.error)
throw res.error;
else if (res.payload.status === 4)
return { type: REQUEST_SCENE_PROCESSING_TASK_UPDATE_SUCCESS, task: res.payload }
else
return requestSceneProcessingTaskMessage(res.payload)
})
.retryWhen(errors => errors.delay(3000));
}
Upvotes: 1