Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package pipeline_thumbnail
import (
"context"
"errors"
"fmt"
"io"
"github.com/getsentry/sentry-go"
sfstreams "github.com/t2bot/go-singleflight-streams"
"github.com/turt2live/matrix-media-repo/common"
"github.com/turt2live/matrix-media-repo/common/rcontext"
"github.com/turt2live/matrix-media-repo/database"
"github.com/turt2live/matrix-media-repo/pipelines/_steps/download"
"github.com/turt2live/matrix-media-repo/pipelines/_steps/thumbnails"
"github.com/turt2live/matrix-media-repo/pipelines/pipeline_download"
"github.com/turt2live/matrix-media-repo/util"
)
var sf = new(sfstreams.Group)
// ThumbnailOpts are options for generating a thumbnail
type ThumbnailOpts struct {
pipeline_download.DownloadOpts
Width int
Height int
Method string
Animated bool
}
func (o ThumbnailOpts) String() string {
return fmt.Sprintf("%s,w=%d,h=%d,m=%s,a=%t", o.DownloadOpts.String(), o.Width, o.Height, o.Method, o.Animated)
}
func (o ThumbnailOpts) ImpliedDownloadOpts() pipeline_download.DownloadOpts {
return pipeline_download.DownloadOpts{
FetchRemoteIfNeeded: o.FetchRemoteIfNeeded,
BlockForReadUntil: o.BlockForReadUntil,
RecordOnly: true,
// We remove the range parameters to ensure we get a useful download stream
StartByte: -1,
EndByte: -1,
}
}
func Execute(ctx rcontext.RequestContext, origin string, mediaId string, opts ThumbnailOpts) (*database.DbThumbnail, io.ReadCloser, error) {
// Step 1: Fix the request parameters
w, h, method, err1 := thumbnails.PickNewDimensions(ctx, opts.Width, opts.Height, opts.Method)
if err1 != nil {
return nil, nil, err1
}
opts.Width = w
opts.Height = h
opts.Method = method
// Step 2: Make our context a timeout context
var cancel context.CancelFunc
//goland:noinspection GoVetLostCancel - we handle the function in our custom cancelCloser struct
ctx.Context, cancel = context.WithTimeout(ctx.Context, opts.BlockForReadUntil)
// Step 3: Join the singleflight queue
recordCh := make(chan *database.DbThumbnail)
defer close(recordCh)
r, err, _ := sf.Do(fmt.Sprintf("%s/%s?%s", origin, mediaId, opts.String()), func() (io.ReadCloser, error) {
serveRecord := func(recordCh chan *database.DbThumbnail, record *database.DbThumbnail) {
recordCh <- record
}
// Step 4: Get the associated media record (without stream)
mediaRecord, _, err := pipeline_download.Execute(ctx, origin, mediaId, opts.ImpliedDownloadOpts())
if err != nil {
return nil, err
}
if mediaRecord == nil {
return nil, common.ErrMediaNotFound
}
// Step 5: Check for quarantine
// TODO: Quarantine
// Step 6: See if we're lucky enough to already have this thumbnail
thumbDb := database.GetInstance().Thumbnails.Prepare(ctx)
record, err := thumbDb.GetByParams(origin, mediaId, opts.Width, opts.Height, opts.Method, opts.Animated)
if err != nil {
return nil, err
}
if record != nil {
go serveRecord(recordCh, record) // async function to prevent deadlock
if opts.RecordOnly {
return nil, nil
}
return download.OpenStream(ctx, record.Locatable, opts.StartByte, opts.EndByte)
}
// Step 7: Generate the thumbnail and return that
record, r, err := thumbnails.Generate(ctx, mediaRecord, opts.Width, opts.Height, opts.Method, opts.Animated)
if err != nil {
return nil, err
}
go serveRecord(recordCh, record)
if opts.RecordOnly {
defer r.Close()
return nil, nil
}
// Step 8: Create a limited stream
return download.CreateLimitedStream(ctx, r, opts.StartByte, opts.EndByte)
})
if err != nil {
cancel()
return nil, nil, err
}
record := <-recordCh
if opts.RecordOnly {
if r != nil {
devErr := errors.New("expected no thumbnail stream, but got one anyways")
ctx.Log.Warn(devErr)
sentry.CaptureException(devErr)
r.Close()
}
cancel()
return record, nil, nil
}
return record, util.NewCancelCloser(r, cancel), nil
}