Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
Matrix
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Container Registry
Model registry
Operate
Environments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
TeDomum
Matrix
Commits
e48c7aac
Commit
e48c7aac
authored
7 years ago
by
Richard van der Hoff
Browse files
Options
Downloads
Patches
Plain Diff
Add transactional API to history purge
Make the purge request return quickly, and allow scripts to poll for updates.
parent
1708412f
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
docs/admin_api/purge_history_api.rst
+27
-0
27 additions, 0 deletions
docs/admin_api/purge_history_api.rst
synapse/handlers/message.py
+99
-5
99 additions, 5 deletions
synapse/handlers/message.py
synapse/rest/client/v1/admin.py
+35
-3
35 additions, 3 deletions
synapse/rest/client/v1/admin.py
with
161 additions
and
8 deletions
docs/admin_api/purge_history_api.rst
+
27
−
0
View file @
e48c7aac
...
...
@@ -32,3 +32,30 @@ specified by including an event_id in the URI, or by setting a
id is given, that event (and others at the same graph depth) will be retained.
If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
in milliseconds.
The API starts the purge running, and returns immediately with a JSON body with
a purge id:
.. code:: json
{
"purge_id": "<opaque id>"
}
Purge status query
------------------
It is possible to poll for updates on recent purges with a second API;
``GET /_matrix/client/r0/admin/purge_history_status/<purge_id>``
(again, with a suitable ``access_token``). This API returns a JSON body like
the following:
.. code:: json
{
"status": "active"
}
The status will be one of ``active``, ``complete``, or ``failed``.
This diff is collapsed.
Click to expand it.
synapse/handlers/message.py
+
99
−
5
View file @
e48c7aac
...
...
@@ -13,7 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
twisted.internet
import
defer
from
twisted.internet
import
defer
,
reactor
from
twisted.python.failure
import
Failure
from
synapse.api.constants
import
EventTypes
,
Membership
from
synapse.api.errors
import
AuthError
,
Codes
,
SynapseError
...
...
@@ -24,9 +25,10 @@ from synapse.types import (
UserID
,
RoomAlias
,
RoomStreamToken
,
)
from
synapse.util.async
import
run_on_reactor
,
ReadWriteLock
,
Limiter
from
synapse.util.logcontext
import
preserve_fn
from
synapse.util.logcontext
import
preserve_fn
,
run_in_background
from
synapse.util.metrics
import
measure_func
from
synapse.util.frozenutils
import
unfreeze
from
synapse.util.stringutils
import
random_string
from
synapse.visibility
import
filter_events_for_client
from
synapse.replication.http.send_event
import
send_event_to_master
...
...
@@ -41,6 +43,36 @@ import ujson
logger
=
logging
.
getLogger
(
__name__
)
class
PurgeStatus
(
object
):
"""
Object tracking the status of a purge request
This class contains information on the progress of a purge request, for
return by get_purge_status.
Attributes:
status (int): Tracks whether this request has completed. One of
STATUS_{ACTIVE,COMPLETE,FAILED}
"""
STATUS_ACTIVE
=
0
STATUS_COMPLETE
=
1
STATUS_FAILED
=
2
STATUS_TEXT
=
{
STATUS_ACTIVE
:
"
active
"
,
STATUS_COMPLETE
:
"
complete
"
,
STATUS_FAILED
:
"
failed
"
,
}
def
__init__
(
self
):
self
.
status
=
PurgeStatus
.
STATUS_ACTIVE
def
asdict
(
self
):
return
{
"
status
"
:
PurgeStatus
.
STATUS_TEXT
[
self
.
status
]
}
class
MessageHandler
(
BaseHandler
):
def
__init__
(
self
,
hs
):
...
...
@@ -51,25 +83,87 @@ class MessageHandler(BaseHandler):
self
.
pagination_lock
=
ReadWriteLock
()
self
.
_purges_in_progress_by_room
=
set
()
# map from purge id to PurgeStatus
self
.
_purges_by_id
=
{}
@defer.inlineCallbacks
def
purge_history
(
self
,
room_id
,
topological_ordering
,
delete_local_events
=
False
):
def
start_purge_history
(
self
,
room_id
,
topological_ordering
,
delete_local_events
=
False
):
"""
Start off a history purge on a room.
Args:
room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
str: unique ID for this purge transaction.
"""
if
room_id
in
self
.
_purges_in_progress_by_room
:
raise
SynapseError
(
400
,
"
History purge already in progress for %s
"
%
(
room_id
,
),
)
purge_id
=
random_string
(
16
)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger
.
info
(
"
[purge] starting purge_id %s
"
,
purge_id
)
self
.
_purges_by_id
[
purge_id
]
=
PurgeStatus
()
run_in_background
(
self
.
_purge_history
,
purge_id
,
room_id
,
topological_ordering
,
delete_local_events
,
)
return
purge_id
@defer.inlineCallbacks
def
_purge_history
(
self
,
purge_id
,
room_id
,
topological_ordering
,
delete_local_events
):
"""
Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
Deferred
"""
self
.
_purges_in_progress_by_room
.
add
(
room_id
)
try
:
with
(
yield
self
.
pagination_lock
.
write
(
room_id
)):
yield
self
.
store
.
purge_history
(
room_id
,
topological_ordering
,
delete_local_events
,
)
logger
.
info
(
"
[purge] complete
"
)
self
.
_purges_by_id
[
purge_id
].
status
=
PurgeStatus
.
STATUS_COMPLETE
except
Exception
:
logger
.
error
(
"
[purge] failed: %s
"
,
Failure
().
getTraceback
().
rstrip
())
self
.
_purges_by_id
[
purge_id
].
status
=
PurgeStatus
.
STATUS_FAILED
finally
:
self
.
_purges_in_progress_by_room
.
discard
(
room_id
)
# remove the purge from the list 24 hours after it completes
def
clear_purge
():
del
self
.
_purges_by_id
[
purge_id
]
reactor
.
callLater
(
24
*
3600
,
clear_purge
)
def
get_purge_status
(
self
,
purge_id
):
"""
Get the current status of an active purge
Args:
purge_id (str): purge_id returned by start_purge_history
Returns:
PurgeStatus|None
"""
return
self
.
_purges_by_id
.
get
(
purge_id
)
@defer.inlineCallbacks
def
get_messages
(
self
,
requester
,
room_id
=
None
,
pagin_config
=
None
,
as_client_event
=
True
,
event_filter
=
None
):
...
...
This diff is collapsed.
Click to expand it.
synapse/rest/client/v1/admin.py
+
35
−
3
View file @
e48c7aac
...
...
@@ -17,7 +17,7 @@
from
twisted.internet
import
defer
from
synapse.api.constants
import
Membership
from
synapse.api.errors
import
AuthError
,
SynapseError
,
Codes
from
synapse.api.errors
import
AuthError
,
SynapseError
,
Codes
,
NotFoundError
from
synapse.types
import
UserID
,
create_requester
from
synapse.http.servlet
import
parse_json_object_from_request
...
...
@@ -185,12 +185,43 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode
=
Codes
.
BAD_JSON
,
)
yield
self
.
handlers
.
message_handler
.
purge_history
(
purge_id
=
yield
self
.
handlers
.
message_handler
.
start_
purge_history
(
room_id
,
depth
,
delete_local_events
=
delete_local_events
,
)
defer
.
returnValue
((
200
,
{}))
defer
.
returnValue
((
200
,
{
"
purge_id
"
:
purge_id
,
}))
class
PurgeHistoryStatusRestServlet
(
ClientV1RestServlet
):
PATTERNS
=
client_path_patterns
(
"
/admin/purge_history_status/(?P<purge_id>[^/]+)
"
)
def
__init__
(
self
,
hs
):
"""
Args:
hs (synapse.server.HomeServer)
"""
super
(
PurgeHistoryStatusRestServlet
,
self
).
__init__
(
hs
)
self
.
handlers
=
hs
.
get_handlers
()
@defer.inlineCallbacks
def
on_GET
(
self
,
request
,
purge_id
):
requester
=
yield
self
.
auth
.
get_user_by_req
(
request
)
is_admin
=
yield
self
.
auth
.
is_server_admin
(
requester
.
user
)
if
not
is_admin
:
raise
AuthError
(
403
,
"
You are not a server admin
"
)
purge_status
=
self
.
handlers
.
message_handler
.
get_purge_status
(
purge_id
)
if
purge_status
is
None
:
raise
NotFoundError
(
"
purge id
'
%s
'
not found
"
%
purge_id
)
defer
.
returnValue
((
200
,
purge_status
.
asdict
()))
class
DeactivateAccountRestServlet
(
ClientV1RestServlet
):
...
...
@@ -561,6 +592,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
def
register_servlets
(
hs
,
http_server
):
WhoisRestServlet
(
hs
).
register
(
http_server
)
PurgeMediaCacheRestServlet
(
hs
).
register
(
http_server
)
PurgeHistoryStatusRestServlet
(
hs
).
register
(
http_server
)
DeactivateAccountRestServlet
(
hs
).
register
(
http_server
)
PurgeHistoryRestServlet
(
hs
).
register
(
http_server
)
UsersRestServlet
(
hs
).
register
(
http_server
)
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment