Andrew Stingray
Andrew Stingray

Reputation: 31

How to add COPY FROM STDIN in Elixir Repo.Transaction

I'm experimenting with Elixir Ecto and now I need to realize COPY FROM STDIN in my code. I found example in postgrex.ex on github:

Postgrex.transaction(pid, fn(conn) ->
  query = Postgrex.prepare!(conn, "", "COPY posts FROM STDIN",[copy_data:true])
  stream = Postgrex.stream(conn, query, [])
  Enum.into(File.stream!("posts"), stream)
end)

How I can convert it for my needs. What pid I have to pass?

Repo.transaction fn ->
  query = "some query"
  Ecto.Adapters.SQL.query!(Repo, query, [])
  #copy from stdin, how??
end

Upvotes: 3

Views: 872

Answers (3)

Sam Houston
Sam Houston

Reputation: 3651

The following worked for me

csv_data = "col1,col2\r\n1,1\r\n2,2"

sql = """
COPY my_table (col1, col2)
FROM STDIN
CSV HEADER
"""

steam = SQL.stream(Repo, sql)

Repo.transaction(fn -> Enum.into([csv_data], stream) end)

Upvotes: 2

Johan Wärlander
Johan Wärlander

Reputation: 168

As per this post by James Fish on the Elixir Forum, it should be possible:

It is possible to run a COPY FROM STDIN using Ecto.Adapters.SQL.query!/4 but can't use a collectable/stream:

Ecto.Adapters.SQL.query!(Repo, "COPY posts FROM STDIN",
[data_as_(final)_parameter], [copy_data: true])

..and:

From Ecto 2.1 the above no longer works. Instead must use built in streams:

stream = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN")
TestRepo.transaction(fn -> Enum.into(data_enum, stream) end)

Upvotes: 5

Doot
Doot

Reputation: 11

I wasn't able to figure out how to do this with pure Ecto either, but why not use Postgrex? You're probably already using it as an adapter.

Assuming your STDIN will be CSV formatted data:

def bulk_update(data_stream, temp_table_query, copy_data_query) do
  opts = MyApp.Repo.config
  {:ok, pid} = Postgrex.start_link(opts)
  Postgrex.transaction(pid, &update_table(&1, data_stream, temp_table_query, copy_data_query))
  GenServer.stop(pid)
end

def update_table(conn, data, create_temp_table, copy_table_data) do
  Postgrex.query(conn, create_temp_table, [])
  query = Postgrex.prepare!(conn, "", "COPY incoming_data FROM STDIN DELIMITER ',' CSV", [copy_data: true])
  stream = Postgrex.stream(conn, query, [])
  Enum.into(data, stream)
  Postgrex.query(conn, copy_table_data, [])
end

Upvotes: 1

Related Questions