Reputation: 31
I'm experimenting with Elixir Ecto and now I need to realize COPY FROM STDIN in my code. I found example in postgrex.ex on github:
Postgrex.transaction(pid, fn(conn) ->
query = Postgrex.prepare!(conn, "", "COPY posts FROM STDIN",[copy_data:true])
stream = Postgrex.stream(conn, query, [])
Enum.into(File.stream!("posts"), stream)
end)
How I can convert it for my needs. What pid I have to pass?
Repo.transaction fn ->
query = "some query"
Ecto.Adapters.SQL.query!(Repo, query, [])
#copy from stdin, how??
end
Upvotes: 3
Views: 872
Reputation: 3651
The following worked for me
csv_data = "col1,col2\r\n1,1\r\n2,2"
sql = """
COPY my_table (col1, col2)
FROM STDIN
CSV HEADER
"""
steam = SQL.stream(Repo, sql)
Repo.transaction(fn -> Enum.into([csv_data], stream) end)
Upvotes: 2
Reputation: 168
As per this post by James Fish on the Elixir Forum, it should be possible:
It is possible to run a COPY FROM STDIN using Ecto.Adapters.SQL.query!/4 but can't use a collectable/stream:
Ecto.Adapters.SQL.query!(Repo, "COPY posts FROM STDIN", [data_as_(final)_parameter], [copy_data: true])
..and:
From Ecto 2.1 the above no longer works. Instead must use built in streams:
stream = Ecto.Adapters.SQL.stream(TestRepo, "COPY posts FROM STDIN") TestRepo.transaction(fn -> Enum.into(data_enum, stream) end)
Upvotes: 5
Reputation: 11
I wasn't able to figure out how to do this with pure Ecto either, but why not use Postgrex? You're probably already using it as an adapter.
Assuming your STDIN will be CSV formatted data:
def bulk_update(data_stream, temp_table_query, copy_data_query) do
opts = MyApp.Repo.config
{:ok, pid} = Postgrex.start_link(opts)
Postgrex.transaction(pid, &update_table(&1, data_stream, temp_table_query, copy_data_query))
GenServer.stop(pid)
end
def update_table(conn, data, create_temp_table, copy_table_data) do
Postgrex.query(conn, create_temp_table, [])
query = Postgrex.prepare!(conn, "", "COPY incoming_data FROM STDIN DELIMITER ',' CSV", [copy_data: true])
stream = Postgrex.stream(conn, query, [])
Enum.into(data, stream)
Postgrex.query(conn, copy_table_data, [])
end
Upvotes: 1