Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rel/overlay/etc/default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,15 @@ url = {{nouveau_url}}
; Scanner settings to skip dbs and docs would also work:
;[couch_quickjs_scanner_plugin.skip_{dbs,ddoc,docs}]

[couch_auto_purge_plugin]
; The most id/rev pairs the plugin will attempt to purge in
; one request.
;max_batch_size = 500
; The default time-to-live, measured in seconds, before a
; deleted document is eligible to be purged by the plugin.
; Defaults to undefined, which disables auto purging.
;deleted_document_ttl =

[chttpd_auth_lockout]
; CouchDB can temporarily lock out IP addresses that repeatedly fail authentication
; mode can be set to one of three recognised values;
Expand Down
33 changes: 32 additions & 1 deletion src/chttpd/src/chttpd_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
handle_view_cleanup_req/2,
update_doc/4,
http_code_from_status/1,
handle_partition_req/2
handle_partition_req/2,
handle_auto_purge_req/2
]).

-import(
Expand Down Expand Up @@ -390,6 +391,36 @@ update_partition_stats(PathParts) ->
ok
end.

handle_auto_purge_req(#httpd{method = 'GET'} = Req, Db) ->
case fabric:get_auto_purge_props(Db) of
{ok, AutoPurgeProps} ->
send_json(Req, {AutoPurgeProps});
{error, Reason} ->
chttpd:send_error(Req, Reason)
end;
handle_auto_purge_req(#httpd{method = 'PUT'} = Req, Db) ->
{AutoPurgeProps} = chttpd:json_body_obj(Req),
validate_auto_purge_props(AutoPurgeProps),
case fabric:set_auto_purge_props(Db, AutoPurgeProps) of
ok ->
send_json(Req, 202, {[{ok, true}]});
{error, Reason} ->
chttpd:send_error(Req, Reason)
end;
handle_auto_purge_req(#httpd{} = Req, _Db) ->
send_method_not_allowed(Req, "GET,PUT,HEAD").

validate_auto_purge_props([]) ->
ok;
validate_auto_purge_props([{<<"deleted_document_ttl">>, Value} | Rest]) when is_integer(Value) ->
validate_auto_purge_props(Rest);
validate_auto_purge_props([{<<"deleted_document_ttl">>, _Value} | _Rest]) ->
throw({bad_request, <<"deleted_document_ttl must be an integer">>});
validate_auto_purge_props([{_K, _V} | _Rest]) ->
throw({bad_request, <<"invalid auto purge property">>});
validate_auto_purge_props(_Else) ->
throw({bad_request, <<"malformed auto purge body">>}).

handle_design_req(
#httpd{
path_parts = [_DbName, _Design, Name, <<"_", _/binary>> = Action | _Rest]
Expand Down
1 change: 1 addition & 0 deletions src/chttpd/src/chttpd_httpd_handlers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ db_handler(<<"_design">>) -> fun chttpd_db:handle_design_req/2;
db_handler(<<"_partition">>) -> fun chttpd_db:handle_partition_req/2;
db_handler(<<"_temp_view">>) -> fun chttpd_view:handle_temp_view_req/2;
db_handler(<<"_changes">>) -> fun chttpd_db:handle_changes_req/2;
db_handler(<<"_auto_purge">>) -> fun chttpd_db:handle_auto_purge_req/2;
db_handler(_) -> no_match.

design_handler(<<"_view">>) -> fun chttpd_view:handle_view_req/3;
Expand Down
158 changes: 158 additions & 0 deletions src/couch/src/couch_auto_purge_plugin.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_auto_purge_plugin).
-behaviour(couch_scanner_plugin).

-export([
start/2,
resume/2,
complete/1,
checkpoint/1,
db/2,
db_opened/2,
db_closing/2,
doc_fdi/3
]).

-include_lib("couch_scanner/include/couch_scanner_plugin.hrl").
-include_lib("stdlib/include/assert.hrl").

start(ScanId, #{}) ->
St = init_config(ScanId),
?INFO("Starting.", [], St),
{ok, St}.

resume(ScanId, #{}) ->
St = init_config(ScanId),
?INFO("Resuming.", [], St),
{ok, St}.

complete(St) ->
?INFO("Completed", [], St),
{ok, #{}}.

checkpoint(_St) ->
{ok, #{}}.

db(St, DbName) ->
case ttl(St, DbName) of
TTL when is_integer(TTL) ->
{ok, St#{ttl => TTL}};
undefined ->
{skip, St}
end.

db_opened(#{} = St, Db) ->
#{ttl := TTL} = St,
EndSeq = couch_time_seq:since(couch_db:get_time_seq(Db), couch_time_seq:timestamp() - TTL),
ChangeOpts =
if
EndSeq == now -> [];
true -> [{end_key, EndSeq}]
end,
?INFO("scanning for deleted documents in ~s up to ~p", [couch_db:name(Db), EndSeq], meta(St)),
{0, ChangeOpts, St#{count => 0, end_seq => EndSeq}}.

db_closing(#{} = St, Db) ->
#{count := Count} = St,
?INFO("purged ~B deleted documents from ~s", [Count, couch_db:name(Db)], meta(St)),
{ok, St}.

doc_fdi(#{} = St, #full_doc_info{deleted = true} = FDI, Db) ->
#{end_seq := EndSeq} = St,
?assert(
FDI#full_doc_info.update_seq =< EndSeq, "FDI update_seq should not be greater than end seq"
),
{ok, purge(St, FDI, Db)};
doc_fdi(#{} = St, #full_doc_info{}, _Db) ->
{ok, St}.

purge(#{} = St, #full_doc_info{} = FDI, Db) ->
{Id, Revs} = fdi_to_idrevs(FDI),
MaxBatchSize = config:get_integer(atom_to_list(?MODULE), "max_batch_size", 500),
purge(St, Id, Revs, MaxBatchSize, Db).

purge(#{} = St, Id, Revs, MaxBatchSize, Db) when length(Revs) =< MaxBatchSize ->
DbName = mem3:dbname(couch_db:name(Db)),
PurgeFun = fun() -> fabric:purge_docs(DbName, [{Id, Revs}], [?ADMIN_CTX]) end,
Timeout = fabric_util:request_timeout(),
try fabric_util:isolate(PurgeFun, Timeout) of
{Health, Results} when Health == ok; Health == accepted ->
#{count := Count, limiter := Limiter0} = St,
{Wait, Limiter1} = couch_scanner_rate_limiter:update(
Limiter0, doc_write, length(Results)
),
timer:sleep(Wait),
St#{count => Count + length(Results), limiter => Limiter1};
Else ->
?WARN(
"Failed to purge deleted documents in ~s/~s for reason ~p",
[DbName, Id, Else],
meta(St)
),
St
catch
Class:Reason ->
?WARN(
"Failed to purge deleted documents in ~s/~s for reason ~p:~p",
[DbName, Id, Class, Reason],
meta(St)
),
St
end;
purge(#{} = St0, Id, Revs, MaxBatchSize, Db) ->
{RevBatch, RevRest} = lists:split(MaxBatchSize, Revs),
St1 = purge(St0, Id, RevBatch, MaxBatchSize, Db),
purge(St1, Id, RevRest, MaxBatchSize, Db).

fdi_to_idrevs(#full_doc_info{} = FDI) ->
Revs = [
couch_doc:rev_to_str({Pos, RevId})
|| {#leaf{}, {Pos, [RevId | _]}} <- couch_key_tree:get_all_leafs(FDI#full_doc_info.rev_tree)
],
{FDI#full_doc_info.id, Revs}.

init_config(ScanId) ->
#{sid => ScanId, limiter => couch_scanner_rate_limiter:get()}.

meta(#{sid := ScanId}) ->
#{sid => ScanId}.

ttl(St, DbName) ->
DefaultTTL = config:get(atom_to_list(?MODULE), "deleted_document_ttl"),
DbTTL =
case fabric:get_auto_purge_props(DbName) of
{ok, AutoPurgeProps} ->
case couch_util:get_value(<<"deleted_document_ttl">>, AutoPurgeProps) of
TTL when is_integer(TTL) ->
TTL;
Else ->
?WARN(
"TTL in ~s as ttl was '~p', not integer",
[DbName, Else],
meta(St)
),
undefined
end;
{error, Reason} ->
?WARN(
"Failed to fetch ttl in ~s for reason ~p",
[DbName, Reason],
meta(St)
),
undefined
end,
if
DbTTL /= undefined -> DbTTL;
true -> list_to_integer(DefaultTTL)
end.
118 changes: 118 additions & 0 deletions src/couch/test/eunit/couch_auto_purge_plugin_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_auto_purge_plugin_tests).

-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").

-define(PLUGIN, couch_auto_purge_plugin).

couch_quickjs_scanner_plugin_test_() ->
{
foreach,
fun setup/0,
fun teardown/1,
[
?TDEF_FE(t_no_auto_purge_by_default, 10),
?TDEF_FE(t_auto_purge_after_config_ttl, 10),
?TDEF_FE(t_auto_purge_after_db_ttl, 10)
]
}.

setup() ->
{module, _} = code:ensure_loaded(?PLUGIN),
meck:new(?PLUGIN, [passthrough]),
meck:new(couch_scanner_server, [passthrough]),
meck:new(couch_scanner_util, [passthrough]),
Ctx = test_util:start_couch([fabric, couch_scanner]),
DbName = ?tempdb(),
ok = fabric:create_db(DbName, [{q, "2"}, {n, "1"}]),
config:set(atom_to_list(?PLUGIN), "max_batch_items", "1", false),
reset_stats(),
{Ctx, DbName}.

teardown({Ctx, DbName}) ->
config_delete_section("couch_scanner"),
config_delete_section("couch_scanner_plugins"),
config_delete_section(atom_to_list(?PLUGIN)),
couch_scanner:reset_checkpoints(),
couch_scanner:resume(),
fabric:delete_db(DbName),
test_util:stop_couch(Ctx),
meck:unload().

t_no_auto_purge_by_default({_, DbName}) ->
ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}),
?assertEqual(1, doc_del_count(DbName)),
meck:reset(couch_scanner_server),
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
?assertEqual(1, doc_del_count(DbName)),
ok.

t_auto_purge_after_config_ttl({_, DbName}) ->
config:set(atom_to_list(?PLUGIN), "deleted_document_ttl", "-1000000", false),
ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}),
?assertEqual(1, doc_del_count(DbName)),
meck:reset(couch_scanner_server),
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
?assertEqual(0, doc_del_count(DbName)),
ok.

t_auto_purge_after_db_ttl({_, DbName}) ->
ok = fabric:set_auto_purge_props(DbName, [{<<"deleted_document_ttl">>, -1000000}]),
ok = add_doc(DbName, <<"doc1">>, #{<<"_deleted">> => true}),
?assertEqual(1, doc_del_count(DbName)),
meck:reset(couch_scanner_server),
meck:reset(?PLUGIN),
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
wait_exit(10000),
?assertEqual(0, doc_del_count(DbName)),
ok.

reset_stats() ->
Counters = [
[couchdb, query_server, process_error_exits],
[couchdb, query_server, process_errors],
[couchdb, query_server, process_exits]
],
[reset_counter(C) || C <- Counters].

reset_counter(Counter) ->
case couch_stats:sample(Counter) of
0 ->
ok;
N when is_integer(N), N > 0 ->
couch_stats:decrement_counter(Counter, N)
end.

config_delete_section(Section) ->
[config:delete(K, V, false) || {K, V} <- config:get(Section)].

add_doc(DbName, DocId, Body) ->
{ok, _} = fabric:update_doc(DbName, mkdoc(DocId, Body), [?ADMIN_CTX]),
ok.

mkdoc(Id, #{} = Body) ->
Body1 = Body#{<<"_id">> => Id},
jiffy:decode(jiffy:encode(Body1)).

wait_exit(MSec) ->
meck:wait(couch_scanner_server, handle_info, [{'EXIT', '_', '_'}, '_'], MSec).

doc_del_count(DbName) ->
{ok, DbInfo} = fabric:get_db_info(DbName),
couch_util:get_value(doc_del_count, DbInfo).
Loading