Reputation: 2608
We are using SeaweedFS which is a file system to store (image) files, It works as a rest
api. We are trying to move data from one server to another.
there are a few levels of data directories. The basic pattern through which an image is stored is
http://{server}:8888/ballymore-project-wave/snapshots/recordings/{year}/{month}/{day}/{hour}/00_00_000.jpg
Each level of the directory has its own return, in form of JSON
such as
{
"Path": "/ballymore-project-wave/snapshots/recordings/",
"Files": null,
"Directories": [
{
"Name": "2016",
"Id": 91874
},
{
"Name": "2017",
"Id": 1538395
}
],
"Limit": 100,
"LastFileName": "",
"ShouldDisplayLoadMore": false
}
above response is for when you trying to get years for recordings, same responses are for the month, days and an hour. there is a slight change when you fetch single hour as
{
"Path": "/ballymore-project-wave/snapshots/recordings/2016/11/02/01/",
"Files": [
{
"name": "00_00_000.jpg",
"fid": "29515,744a5a496b97ff98"
},
{
"name": "00_01_000.jpg",
"fid": "29514,744a5aa52ea3cf3d"
}
],
"Directories": null,
"Limit": 100,
"LastFileName": "02_15_000.jpg",
"ShouldDisplayLoadMore": true
}
Now we need to move all this data from one server to another. I wrote a script for it as
defp move_snapshots(exids) do
exids
|> Enum.each(fn (exid) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (year) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (month) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (day) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
|> Enum.sort |> Enum.each(fn (hour) ->
request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
|> Enum.sort |> Enum.each(fn (file) ->
exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
|> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
end)
end)
end)
end)
end)
end)
end
This is the main function, exids
meant to be all cameras string type identification, for above example, it is ballymore-project-wave
.
In the above script, I am checking each level and if something is present I am going deeper and till last, I check whether its a valid image as
defp exist_on_seaweed?(url) do
hackney = [pool: :seaweedfs_download_pool, recv_timeout: 30_000_000]
case HTTPoison.get("#{@seaweedfs}#{url}", ["Accept": "application/json"], hackney: hackney) do
{:ok, %HTTPoison.Response{status_code: 200, body: data}} -> {:ok, data}
_error ->
:not_found
end
end
defp copy_or_skip(:not_found, _path), do: :noop
defp copy_or_skip({:ok, data}, path) do
hackney = [pool: :seaweedfs_upload_pool]
case HTTPoison.post("#{@seaweedfs_new}#{path}", {:multipart, [{path, data, []}]}, [], hackney: hackney) do
{:ok, _response} -> Logger.info "[seaweedfs_save]"
{:error, error} -> Logger.info "[seaweedfs_save] [#{inspect error}]"
end
end
This is all working fine But I have a slight issue of resuming this when it get crashed or broke due to some reason, I need guidance/idea for this. As you can see if camera exids
are 200 and it get broke on 100 or maybe less, it will resume but from the very start, we cannot delete things on old server after moving until the full movement, Any help will be appreciated. Also if you think there could be some improvements in the code that would be helpful.
Upvotes: 1
Views: 144
Reputation: 75840
Until you post the actual stacktrace or details of the error you are encountering, it's not possible to figure out exactly what's wrong. But for starters, here are some suggestions that might help:
You should break down your move_snapshots
method into something more understandable, maybe using something like Enum.reduce/3
with recursion and calling your copy_or_skip
method as the base case.
Try wrapping your copy_or_skip
method implementation inside a try/rescue
, rescuing any exceptions, logging them and moving on to the next one.
defp copy_or_skip(args, path) do
# Your Implementation
rescue
error -> Logger.error("Exception caught on #{inspect(path)}\n#{inspect(error)}")
end
You could also just go through the list of all files, and add the valid paths to some "Worker" in a Job processing library like Que
or Toniq
. The library will perform all the move operations and mark them successful or failed. You could then go back to see which operations failed and figure out what caused them, or automatically restart the failed ones.
Some more tips on improving code reliability and performance:
Stream
, or better yet, Flow
to divide the tasks and process them in parallel.Task
process, ideally managed by Supervisor
. (Optionally use a Pool).Upvotes: 1