Skip to content
Snippets Groups Projects
Commit eedd3d9f authored by Edward Welch's avatar Edward Welch
Browse files

prune interval is configurable

canary will suspend all operations on SIGINT but not exit, allowing you to shutdown the canary without it being restarted by docker/kubernetes
SIGTERM will shutdown everything and end the process
parent bcccbbfa
No related branches found
No related tags found
No related merge requests found
......@@ -25,9 +25,9 @@ If the received log is:
* The next in the array to be received, it is removed from the array and the (current time - log timestamp) is recorded in the `response_latency` histogram, this is the expected behavior for well behaving logs
* Not the next in the array received, is is removed from the array, the response time is recorded in the `response_latency` histogram, and the `out_of_order_entries` counter is incremented
* Not in the array at all, the `unexpected_entries` counter is incremented
* Not in the array at all, it is checked against a separate list of received logs to either increment the `duplicate_entries` counter or the `unexpected_entries` counter.
In the background, loki-canary also runs a timer which iterates through all the entries in the internal array, if any are older than the duration specified by the `-wait` flag (default 60s), they are removed from the array and the `missing_entries` counter is incremented
In the background, loki-canary also runs a timer which iterates through all the entries in the internal array, if any are older than the duration specified by the `-wait` flag (default 60s), they are removed from the array and the `websocket_missing_entries` counter is incremented. Then an additional query is made directly to loki for these missing entries to determine if they were actually missing or just didn't make it down the websocket. If they are not found in the followup query the `missing_entries` counter is incremented.
## building and running
......@@ -74,6 +74,10 @@ You should also pass the `-labelname` and `-labelvalue` flags, these are used by
If you get a high number of `unexpected_entries` you may not be waiting long enough and should increase `-wait` from 60s to something larger.
__Be cognizant__ of the relationship between `pruneinterval` and the `interval`. For example, with an interval of 10ms (100 logs per second) and a prune interval of 60s, you will write 6000 logs per minute, if those logs were not received over the websocket, the canary will attempt to query loki directly to see if they are completely lost. __However__ the query return is limited to 1000 results so you will not be able to return all the logs even if they did make it to Loki.
__Likewise__, if you lower the `pruneinterval` you risk causing a denial of service attack as all your canaries attempt to query for missing logs at whatever your `pruneinterval` is defined at.
All options:
```nohighlight
......@@ -91,6 +95,8 @@ All options:
Loki password
-port int
Port which loki-canary should expose metrics (default 3500)
-pruneinterval duration
Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki (default 1m0s)
-size int
Size in bytes of each log line (default 100)
-tls
......@@ -99,4 +105,4 @@ All options:
Loki username
-wait duration
Duration to wait for log entries before reporting them lost (default 1m0s)
```
\ No newline at end of file
```
......@@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
......@@ -29,6 +30,7 @@ func main() {
interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries")
size := flag.Int("size", 100, "Size in bytes of each log line")
wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost")
pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, also the frequency which queries for missing logs will be dispatched to loki")
buckets := flag.Int("buckets", 10, "Number of buckets in the response_latency histogram")
flag.Parse()
......@@ -42,7 +44,7 @@ func main() {
w := writer.NewWriter(os.Stdout, sentChan, *interval, *size)
r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c := comparator.NewComparator(os.Stderr, *wait, 60*time.Second, *buckets, sentChan, receivedChan, r)
c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r)
http.Handle("/metrics", promhttp.Handler())
go func() {
......@@ -53,11 +55,18 @@ func main() {
}()
interrupt := make(chan os.Signal, 1)
terminate := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
signal.Notify(terminate, syscall.SIGTERM)
for {
select {
case <-interrupt:
_, _ = fmt.Fprintf(os.Stderr, "suspending indefinetely\n")
w.Stop()
r.Stop()
c.Stop()
case <-terminate:
_, _ = fmt.Fprintf(os.Stderr, "shutting down\n")
w.Stop()
r.Stop()
......
......@@ -95,8 +95,11 @@ func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.D
}
func (c *Comparator) Stop() {
close(c.quit)
<-c.done
if c.quit != nil {
close(c.quit)
<-c.done
c.quit = nil
}
}
func (c *Comparator) entrySent(time time.Time) {
......
......@@ -92,8 +92,11 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool,
}
func (r *Reader) Stop() {
close(r.quit)
<-r.done
if r.quit != nil {
close(r.quit)
<-r.done
r.quit = nil
}
}
func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
......@@ -105,8 +108,9 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
Scheme: scheme,
Host: r.addr,
Path: "/api/prom/query",
RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) + "&query=" +
url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)),
RawQuery: fmt.Sprintf("start=%d&end=%d", start.UnixNano(), end.UnixNano()) +
"&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) +
"&limit=1000",
}
_, _ = fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String())
......
......@@ -41,8 +41,11 @@ func NewWriter(writer io.Writer, sentChan chan time.Time, entryInterval time.Dur
}
func (w *Writer) Stop() {
close(w.quit)
<-w.done
if w.quit != nil {
close(w.quit)
<-w.done
w.quit = nil
}
}
func (w *Writer) run() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment