Shawn
Shawn

Reputation: 353

Riak's erlang pb client and map fetch

I am trying to store a map of registers and sets in Riak and process them using Riak's pb client. My goal here is to save the map, then extract the persisted map from the returned object and process it (in this case extract the registers, values from sets and translate to JSON). Seems like trying to use riakc_map:fetch to extract values from the returned object is not the right way??

Here's how I am creating it (key is an uuid, QP is list of ints):

M = riak:new_map(),
M1 = riak:map_update({<<"post_id">>, register},
    fun(R) -> riakc_register:set(Key, R) end, M),
M2 = riak:map_update({<<"userids">>, set},
    fun(S) ->
        [riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
    end, M2),
[...]

I then wrap it in an object, and save the the object.

Obj = riakc_obj:new(<<?POST_BUCKET>>, Key, Map) %% Map created via new_map()
{ok, Obj2} = riakc_pb_socket:put(Pid, <<?POST_BUCKET>>, Obj, [return_body])

Now this is where I run into trouble:

Post = binary_to_term(riakc_obj:get_value(Obj2)));
Keys = riakc_map:fetch_keys(Post),  %% returns []
OrigMap = riakc_map:value(Post),    %% returns []
IsKey1 = riakc_map:is_key({<<"post_id">>, register}, Post), %% false
IsKey2 = riakc_map:is_key({<<"post_id">>}, Post),   %% false

Most importantly

PostId = riakc_map:fetch({<<"post_id">>, register}, Post),

dies with

{function_clause, [{orddict,fetch, ...

Post after binary_to_term(riakc_obj:get_value(Obj2))); looks correct as far as I can tell:

< Post = {map,[],
          [{{<<"content">>,register},
            {register,<<>>,<<"<post>Hello, World!\r\n</post>">>}},
           {{<<"post_id">>,register},
            {register,<<>>,<<"238e4300-a651-11e4-86c8-6003088f077a">>}},
           {{<<"userids">>,set},
              [{set,[],[<<"-1">>],[],undefined}]},
            .....

Any help greatly apprecaited!

Upvotes: 1

Views: 349

Answers (1)

Joe
Joe

Reputation: 28316

There are a few things not quite right. This call to update

M2 = riakc_map:update({<<"userids">>, set}, 
    fun(S) ->
        [riakc_set:add_element(helpers:int_to_bin(Q), S) || Q <- QP]
    end, M1),

does not create a set with several members, it creates a list of sets each with a single member. If you are trying to construct a set of integers, use a fold instead of a list comprehension:

M2 = riakc_map:update({<<"userids">>, set}, 
    fun(S) ->
        lists:foldl(
               fun(Q, Acc) ->
                   riakc_set:add_element(helpers:int_to_bin(Q), Acc) 
               end, 
               S, QP),
    end, M1),

This doesn't actually update the value of the map, it stages updates in the record:

#map{value = [],
     updates = [{{<<"post_id">>,register},
                 {register,<<>>,<<"uuid">>}},
                {{<<"userids">>,set},
                 {set,[],
                      [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
                       <<"7">>,<<"8">>,<<"9">>],
                      [],undefined}}],
     removes = [],context = undefined}

At this point, riakc_map:value returns [] because that is the value in the record. To apply the staged updates to the value, you would need to call riakc_pb_socket:modify_type which would also store the value in Riak.


The full process

Prepare the Riak cluster to use the map CRDT

  • Create a bucket type to hold a map
    root@node# riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'

  • Activate the bucket type
    root@node# riak-admin bucket-type activate maps

Create map and update Riak

1> rr(riakc_map).
[map]
2> {ok,Pid}=riakc_pb_socket:start("127.0.0.1",8087),
2> M=riakc_map:new(),
2> Key = <<"testkey">>,
2> Bucket = <<"testbucket">>,
2> Type = <<"maps">>,
2> QP = lists:seq(1,10),
2> M1 = riakc_map:update({<<"post_id">>, register},
2> fun(R) -> riakc_register:set(Key, R) end, M),
2> M2 = riakc_map:update({<<"userids">>, set},
2> fun(S) ->
2> lists:foldl(fun(Q,Acc) -> riakc_set:add_element(list_to_binary(integer_to_list(Q)), Acc) end,S, QP)
2> end, M1),
2> riakc_pb_socket:modify_type(Pid,fun(_) -> M2 end,{Type,Bucket},Key,[create]).
ok

Retrieve the map and check values

3> {ok,Map} = riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key),
3> riakc_map:value(Map).
[{{<<"post_id">>,register},<<"testkey">>},
 {{<<"userids">>,set},
  [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
   <<"7">>,<<"8">>,<<"9">>]}]
4> riakc_map:fetch_keys(Map).
[{<<"post_id">>,register},{<<"userids">>,set}]
5> riakc_map:is_key({<<"post_id">>, register}, Map).
true
6> riakc_map:is_key({<<"post_id">>}, Map).
false

For comparison, this is the value before it was stored:

7> M2.

#map{value = [],
     updates = [{{<<"post_id">>,register},
                 {register,<<>>,<<"testkey">>}},
                {{<<"userids">>,set},
                 {set,[],
                      [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
                       <<"7">>,<<"8">>,<<"9">>],
                      [],undefined}}],
     removes = [],context = undefined}

And this is the final value:

8> Map.
#map{value = [{{<<"post_id">>,register},<<"testkey">>},
              {{<<"userids">>,set},
               [<<"1">>,<<"10">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,<<"6">>,
                <<"7">>,<<"8">>,<<"9">>]}],
     updates = [],removes = [],
     context = <<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,
                 249,120,246,57,114,97,8,106>>}

Changing an existing map is handled with an implicit fetch:

2> riakc_pb_socket:modify_type(Pid,
                 fun(OldMap) ->
                    riakc_map:update({<<"userids">>, set},
                               fun(S) ->
                                   riakc_set:add_element(<<"100">>, S)
                               end, OldMap)
                 end,
                 {Type,Bucket},Key,[]).
ok
3> riakc_pb_socket:fetch_type(Pid,{Type,Bucket},Key).
{ok,{map,[{{<<"post_id">>,register},<<"testkey">>},
      {{<<"userids">>,set},
       [<<"1">>,<<"10">>,<<"100">>,<<"2">>,<<"3">>,<<"4">>,<<"5">>,
        <<"6">>,<<"7">>,<<"8">>,<<"9">>]}],
     [],[],
     <<131,108,0,0,0,1,104,2,109,0,0,0,8,35,9,254,249,120,
       246,57,114,97,...>>}}

Upvotes: 5

Related Questions