diff --git a/tests/verify_repl_deleteonfail.erl b/tests/verify_repl_deleteonfail.erl deleted file mode 100644 index 328e36b46..000000000 --- a/tests/verify_repl_deleteonfail.erl +++ /dev/null @@ -1,273 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- -%% @doc When replicating deletes, if a node is down in the cluster, clusters -%% will get out of sync, unless the deletion of tombstones is deferred when -%% non-primaries are unavailable - --module(verify_repl_deleteonfail). --export([confirm/0]). --import(location, [plan_and_wait/2]). - --include_lib("eunit/include/eunit.hrl"). - --define(DEFAULT_RING_SIZE, 16). --define(SNK_WORKERS, 4). --define(MAX_RESULTS, 512). --define(NUM_KEYS_PERNODE, 5000). --define(MAX_LOOOPS, 20). - - --define(CFG_REPL(SrcQueueDefns, NVal), - [{riak_kv, - [ - % Speedy AAE configuration - {anti_entropy, {off, []}}, - {tictacaae_active, active}, - {tictacaae_parallelstore, leveled_ko}, - % if backend not leveled will use parallel key-ordered - % store - {tictacaae_rebuildwait, 4}, - {tictacaae_rebuilddelay, 3600}, - {tictacaae_rebuildtick, 3600000}, - {replrtq_enablesrc, true}, - {replrtq_srcqueue, SrcQueueDefns}, - {log_readrepair, true}, - {read_repair_log, true}, - {delete_mode, immediate}, - {defer_reap_on_failure, true} - ]}, - {riak_core, - [ - {ring_creation_size, ?DEFAULT_RING_SIZE}, - {vnode_management_timer, 2000}, - {vnode_inactivity_timeout, 4000}, - {handoff_concurrency, 16}, - {default_bucket_props, - [ - {n_val, NVal}, - {allow_mult, true}, - {dvv_enabled, true} - ]} - ]}] - ). - --define(SNK_CONFIG(ClusterName, PeerList), - [{riak_kv, - [{replrtq_enablesink, true}, - {replrtq_sinkqueue, ClusterName}, - {replrtq_sinkpeers, PeerList}, - {replrtq_sinkworkers, ?SNK_WORKERS}]}]). - --define(FS_CONFIG(PeerIP, PeerPort, LocalClusterName, RemoteClusterName), - [{riak_kv, - [{ttaaefs_scope, all}, - {ttaaefs_localnval, 3}, - {ttaaefs_remotenval, 1}, - {ttaaefs_peerip, PeerIP}, - {ttaaefs_peerport, PeerPort}, - {ttaaefs_peerprotocol, pb}, - {ttaaefs_allcheck, 0}, - {ttaaefs_autocheck, 0}, - {ttaaefs_daycheck, 0}, - {ttaaefs_hourcheck, 0}, - {ttaaefs_nocheck, 24}, - {ttaaefs_maxresults, ?MAX_RESULTS}, - {ttaaefs_queuename, LocalClusterName}, - {ttaaefs_queuename_peer, RemoteClusterName}, - {ttaaefs_logrepairs, true}]}]). - -confirm() -> - [ClusterA, ClusterB] = - rt:deploy_clusters([ - {5, ?CFG_REPL("cluster_b:any", 3)}, - {1, ?CFG_REPL("cluster_a:any", 1)}]), - - lager:info("Discover Peer IP/ports and restart with peer config"), - reset_peer_config(ClusterA, ClusterB), - - lists:foreach( - fun(N) -> rt:wait_until_ready(N), rt:wait_until_pingable(N) end, - ClusterA ++ ClusterB - ), - - rt:join_cluster(ClusterA), - rt:join_cluster(ClusterB), - - ok = setup_fullsync_peer(ClusterA, hd(ClusterB)), - - ok = verify_repl_delete(ClusterA), - pass. - - -verify_repl_delete(Nodes) -> - - lager:info("Commencing object load"), - KeyLoadFun = - fun(B, V) -> - fun(Node, KeyCount) -> - lager:info("Loading from key ~w on node ~w", [KeyCount, Node]), - KVs = - test_data( - KeyCount + 1, KeyCount + ?NUM_KEYS_PERNODE, V), - ok = write_data(Node, B, KVs, []), - KeyCount + ?NUM_KEYS_PERNODE - end - end, - - lists:foldl(KeyLoadFun(<<"B1">>, <<"v1">>), 1, Nodes), - lager:info("Loaded ~w objects", [?NUM_KEYS_PERNODE * length(Nodes)]), - wait_for_queues_to_drain(Nodes, cluster_b), - lists:foldl(KeyLoadFun(<<"B2">>, <<"v1">>), 1, Nodes), - lager:info("Loaded ~w objects", [?NUM_KEYS_PERNODE * length(Nodes)]), - wait_for_queues_to_drain(Nodes, cluster_b), - lists:foldl(KeyLoadFun(<<"B3">>, <<"v1">>), 1, Nodes), - lager:info("Loaded ~w objects", [?NUM_KEYS_PERNODE * length(Nodes)]), - wait_for_queues_to_drain(Nodes, cluster_b), - lists:foldl(KeyLoadFun(<<"B4">>, <<"v1">>), 1, Nodes), - lager:info("Loaded ~w objects", [?NUM_KEYS_PERNODE * length(Nodes)]), - wait_for_queues_to_drain(Nodes, cluster_b), - - - lager:info( - "Stopping a node - now tombstones will not be auto-reaped in A"), - FiddlingNode = hd(tl(Nodes)), - rt:stop_and_wait(FiddlingNode), - - lager:info("Delete all keys for B1 bucket"), - delete_data( - hd(Nodes), <<"B1">>, lists:seq(1, ?NUM_KEYS_PERNODE * length(Nodes))), - lager:info("Delete attempts completed"), - wait_for_queues_to_drain(Nodes -- [FiddlingNode], cluster_b), - - rt:start_and_wait(FiddlingNode), - - wait_for_reaps_to_drain(Nodes -- [FiddlingNode]), - - LoopsToSync = count_loops_to_sync(Nodes), - - ?assert(is_integer(LoopsToSync)), - ?assert(LoopsToSync < 5). - - -to_key(N) -> - list_to_binary(io_lib:format("K~4..0B", [N])). - -test_data(Start, End, V) -> - Keys = [to_key(N) || N <- lists:seq(Start, End)], - [{K, <>} || K <- Keys]. - - -write_data(Node, Bucket, KVs, Opts) -> - PB = rt:pbc(Node), - [begin - O = - case riakc_pb_socket:get(PB, Bucket, K) of - {ok, Prev} -> - riakc_obj:update_value(Prev, V); - _ -> - riakc_obj:new(Bucket, K, V) - end, - ?assertMatch(ok, riakc_pb_socket:put(PB, O, Opts)) - end || {K, V} <- KVs], - riakc_pb_socket:stop(PB), - ok. - -delete_data(Node, Bucket, Ns) -> - PB = rt:pbc(Node), - lists:foreach( - fun(N) -> ok = riakc_pb_socket:delete(PB, Bucket, to_key(N)) end, Ns), - riakc_pb_socket:stop(PB), - ok. - -reset_peer_config(ClusterA, ClusterB) -> - FoldToPeerConfigPB = - fun(Node, Acc) -> - {pb, {IP, Port}} = - lists:keyfind(pb, 1, rt:connection_info(Node)), - Acc0 = case Acc of "" -> ""; _ -> Acc ++ "|" end, - Acc0 ++ IP ++ ":" ++ integer_to_list(Port) ++ ":pb" - end, - ClusterASnkPL = lists:foldl(FoldToPeerConfigPB, "", ClusterB), - ClusterBSnkPL = lists:foldl(FoldToPeerConfigPB, "", ClusterA), - ClusterASNkCfg = ?SNK_CONFIG(cluster_a, ClusterASnkPL), - ClusterBSNkCfg = ?SNK_CONFIG(cluster_b, ClusterBSnkPL), - lists:foreach( - fun(N) -> rt:set_advanced_conf(N, ClusterASNkCfg) end, ClusterA), - lists:foreach( - fun(N) -> rt:set_advanced_conf(N, ClusterBSNkCfg) end, ClusterB), - - lists:foreach( - fun(N) -> rt:wait_for_service(N, riak_kv) end, - ClusterA ++ ClusterB). - - -wait_for_queues_to_drain([], QueueName) -> - lager:info("Queue ~w drained on nodes", [QueueName]); -wait_for_queues_to_drain([N|Rest], QueueName) -> - rt:wait_until( - fun() -> - {QueueName, {0, 0, 0}} == - rpc:call(N, riak_kv_replrtq_src, length_rtq, [QueueName]) - end - ), - wait_for_queues_to_drain(Rest, QueueName). - - -wait_for_reaps_to_drain([]) -> - lager:info("Reaps drained on all nodes"); -wait_for_reaps_to_drain([N|Rest]) -> - rt:wait_until( - fun() -> - {mqueue_lengths, MQLs} = - lists:keyfind( - mqueue_lengths, - 1, - rpc:call(N, riak_kv_reaper, reap_stats, [])), - lager:info("Reap queue lengths ~w on ~w", [MQLs, N]), - lists:sum(lists:map(fun({_P, L}) -> L end, MQLs)) == 0 - end - ), - lager:info("Reaps drained fron Node ~p", [N]), - wait_for_reaps_to_drain(Rest). - -setup_fullsync_peer(ClusterA, NodeB) -> - {pb, {IP, Port}} = lists:keyfind(pb, 1, rt:connection_info(NodeB)), - ClusterACfg = ?FS_CONFIG(IP, Port, cluster_b, cluster_a), - lists:foreach( - fun(N) -> rt:set_advanced_conf(N, ClusterACfg) end, ClusterA), - - lager:info("Waiting for convergence."), - rt:wait_until_ring_converged(ClusterA), - lists:foreach( - fun(N) -> rt:wait_for_service(N, riak_kv) end, ClusterA). - -count_loops_to_sync(Nodes) -> - count_loops_to_sync(Nodes, 1). - -count_loops_to_sync(_Nodes, ?MAX_LOOOPS) -> - false; -count_loops_to_sync(Nodes, L) -> - R = rpc:call(hd(Nodes), riak_client, ttaaefs_fullsync, [all_check, 60]), - lager:info("Full sync A -> B ~p", [R]), - case R of - {root_compare, 0} -> - L; - _ -> - wait_for_queues_to_drain(Nodes, cluster_b), - count_loops_to_sync(Nodes, L + 1) - end.