Junaid Farooq
Junaid Farooq

Reputation: 2608

how to resume an elixir task from where it broke Elixir Phoenix

We are using SeaweedFS which is a file system to store (image) files, It works as a rest api. We are trying to move data from one server to another.

there are a few levels of data directories. The basic pattern through which an image is stored is

http://{server}:8888/ballymore-project-wave/snapshots/recordings/{year}/{month}/{day}/{hour}/00_00_000.jpg

Each level of the directory has its own return, in form of JSON such as

{
    "Path": "/ballymore-project-wave/snapshots/recordings/",
    "Files": null,
    "Directories": [
        {
            "Name": "2016",
            "Id": 91874
        },
        {
            "Name": "2017",
            "Id": 1538395
        }
    ],
    "Limit": 100,
    "LastFileName": "",
    "ShouldDisplayLoadMore": false
}

above response is for when you trying to get years for recordings, same responses are for the month, days and an hour. there is a slight change when you fetch single hour as

{
    "Path": "/ballymore-project-wave/snapshots/recordings/2016/11/02/01/",
    "Files": [
        {
            "name": "00_00_000.jpg",
            "fid": "29515,744a5a496b97ff98"
        },
        {
            "name": "00_01_000.jpg",
            "fid": "29514,744a5aa52ea3cf3d"
        }
    ],
    "Directories": null,
    "Limit": 100,
    "LastFileName": "02_15_000.jpg",
    "ShouldDisplayLoadMore": true
}

Now we need to move all this data from one server to another. I wrote a script for it as

  defp move_snapshots(exids) do
    exids
    |> Enum.each(fn (exid) ->
      request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/", "Directories", "Name")
      |> Enum.sort |> Enum.each(fn (year) ->
        request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/", "Directories", "Name")
        |> Enum.sort |> Enum.each(fn (month) ->
          request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/", "Directories", "Name")
          |> Enum.sort |> Enum.each(fn (day) ->
            request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/", "Directories", "Name")
            |> Enum.sort |> Enum.each(fn (hour) ->
              request_from_seaweedfs("#{@seaweedfs}/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/?limit=3600", "Files", "name")
              |> Enum.sort |> Enum.each(fn (file) ->
                exist_on_seaweed?("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
                |> copy_or_skip("/#{exid}/snapshots/recordings/#{year}/#{month}/#{day}/#{hour}/#{file}")
              end)
            end)
          end)
        end)
      end)
    end)
  end

This is the main function, exids meant to be all cameras string type identification, for above example, it is ballymore-project-wave.

In the above script, I am checking each level and if something is present I am going deeper and till last, I check whether its a valid image as

  defp exist_on_seaweed?(url) do
    hackney = [pool: :seaweedfs_download_pool, recv_timeout: 30_000_000]
    case HTTPoison.get("#{@seaweedfs}#{url}", ["Accept": "application/json"], hackney: hackney) do
      {:ok, %HTTPoison.Response{status_code: 200, body: data}} -> {:ok, data}
      _error ->
        :not_found
    end
  end

  defp copy_or_skip(:not_found, _path), do: :noop
  defp copy_or_skip({:ok, data}, path) do
    hackney = [pool: :seaweedfs_upload_pool]
    case HTTPoison.post("#{@seaweedfs_new}#{path}", {:multipart, [{path, data, []}]}, [], hackney: hackney) do
      {:ok, _response} -> Logger.info "[seaweedfs_save]"
      {:error, error} -> Logger.info "[seaweedfs_save] [#{inspect error}]"
    end
  end

This is all working fine But I have a slight issue of resuming this when it get crashed or broke due to some reason, I need guidance/idea for this. As you can see if camera exids are 200 and it get broke on 100 or maybe less, it will resume but from the very start, we cannot delete things on old server after moving until the full movement, Any help will be appreciated. Also if you think there could be some improvements in the code that would be helpful.

Upvotes: 1

Views: 144

Answers (1)

Sheharyar
Sheharyar

Reputation: 75840

Until you post the actual stacktrace or details of the error you are encountering, it's not possible to figure out exactly what's wrong. But for starters, here are some suggestions that might help:

  • You should break down your move_snapshots method into something more understandable, maybe using something like Enum.reduce/3 with recursion and calling your copy_or_skip method as the base case.

  • Try wrapping your copy_or_skip method implementation inside a try/rescue, rescuing any exceptions, logging them and moving on to the next one.

    defp copy_or_skip(args, path) do
      # Your Implementation
    rescue
      error -> Logger.error("Exception caught on #{inspect(path)}\n#{inspect(error)}")
    end
    
  • You could also just go through the list of all files, and add the valid paths to some "Worker" in a Job processing library like Que or Toniq. The library will perform all the move operations and mark them successful or failed. You could then go back to see which operations failed and figure out what caused them, or automatically restart the failed ones.


Some more tips on improving code reliability and performance:

  • Use Stream, or better yet, Flow to divide the tasks and process them in parallel.
  • Perform the actual move operation in a separate Task process, ideally managed by Supervisor. (Optionally use a Pool).

Upvotes: 1

Related Questions