Skip to content
Snippets Groups Projects
Commit 1f1706f4 authored by Sandeep Sukhani's avatar Sandeep Sukhani Committed by Goutham Veeramachaneni
Browse files

Move live log tailing behind websocket on server (#439)

* Move live log tailing behind websocket on server

Live log tailing now requires client to just open a websocket and keep listening to it for getting live logs for matching query

* Fixed linting errors

* Unexported tailQuery querier method

* Added a delay in querying of logs to not miss delayed entries

* Renamed variable for better readability

* Some code refactoring suggested in PR review

* Fixed linter errors

* Code refactoring requested in PR review

* Disabling linting for writeHTTPErrorResponse since same value for an input is being passed from all function calls for now

* Removed a function which already existed

* Improved printing of log entries using cli and some other code refactoring

* Checking no-labels flag before printing log stream while tailing
parent 201f738b
No related branches found
No related tags found
No related merge requests found
Showing
with 2387 additions and 100 deletions
......@@ -422,6 +422,14 @@
revision = "e3702bed27f0d39777b0b37b664b6280e8ef8fbf"
version = "v1.6.2"
[[projects]]
digest = "1:7b5c6e2eeaa9ae5907c391a91c132abfd5c9e8a784a341b5625e750c67e6825d"
name = "github.com/gorilla/websocket"
packages = ["."]
pruneopts = "UT"
revision = "66b9c49e59c6c48f0ffce28c2d8b8a5678502c6d"
version = "v1.4.0"
[[projects]]
branch = "master"
digest = "1:86c1210529e69d69860f2bb3ee9ccce0b595aa3f9165e7dd1388e5c612915888"
......@@ -1366,10 +1374,10 @@
"github.com/gogo/protobuf/types",
"github.com/golang/snappy",
"github.com/gorilla/mux",
"github.com/gorilla/websocket",
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc",
"github.com/hpcloud/tail",
"github.com/mwitkow/go-grpc-middleware",
"github.com/opentracing-contrib/go-stdlib/nethttp",
"github.com/opentracing/opentracing-go",
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
......
package main
import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/grafana/loki/pkg/logproto"
)
......@@ -16,6 +20,7 @@ const (
queryPath = "/api/prom/query?query=%s&limit=%d&start=%d&end=%d&direction=%s&regexp=%s"
labelsPath = "/api/prom/label"
labelValuesPath = "/api/prom/label/%s/values"
tailPath = "/api/prom/tail?query=%s&regexp=%s"
)
func query(from, through time.Time, direction logproto.Direction) (*logproto.QueryResponse, error) {
......@@ -74,3 +79,31 @@ func doRequest(path string, out interface{}) error {
return json.NewDecoder(resp.Body).Decode(out)
}
func liveTailQueryConn() (*websocket.Conn, error) {
path := fmt.Sprintf(tailPath, url.QueryEscape(*queryStr), url.QueryEscape(*regexpStr))
return wsConnect(path)
}
func wsConnect(path string) (*websocket.Conn, error) {
url := *addr + path
if strings.HasPrefix(url, "https") {
url = strings.Replace(url, "https", "wss", 1)
} else if strings.HasPrefix(url, "http") {
url = strings.Replace(url, "http", "ws", 1)
}
fmt.Println(url)
h := http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(*username+":"+*password))}}
c, resp, err := websocket.DefaultDialer.Dial(url, h)
if err != nil {
if resp == nil {
return nil, err
}
buf, _ := ioutil.ReadAll(resp.Body) // nolint
return nil, fmt.Errorf("Error response from server: %s (%v)", string(buf), err)
}
return c, nil
}
......@@ -15,46 +15,46 @@ import (
)
func doQuery() {
if *tail {
tailQuery()
return
}
var (
i iter.EntryIterator
labelsCache = mustParseLabels
common labels.Labels
maxLabelsLen = 100
)
if *tail {
i = tailQuery()
} else {
end := time.Now()
start := end.Add(-*since)
d := logproto.BACKWARD
if *forward {
d = logproto.FORWARD
}
end := time.Now()
start := end.Add(-*since)
d := logproto.BACKWARD
if *forward {
d = logproto.FORWARD
}
resp, err := query(start, end, d)
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
resp, err := query(start, end, d)
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
cache, lss := parseLabels(resp)
cache, lss := parseLabels(resp)
labelsCache = func(labels string) labels.Labels {
return cache[labels]
}
common = commonLabels(lss)
i = iter.NewQueryResponseIterator(resp, d)
labelsCache := func(labels string) labels.Labels {
return cache[labels]
}
common = commonLabels(lss)
i = iter.NewQueryResponseIterator(resp, d)
if len(common) > 0 {
fmt.Println("Common labels:", color.RedString(common.String()))
}
if len(common) > 0 {
fmt.Println("Common labels:", color.RedString(common.String()))
}
for _, ls := range cache {
ls = subtract(common, ls)
len := len(ls.String())
if maxLabelsLen < len {
maxLabelsLen = len
}
for _, ls := range cache {
ls = subtract(common, ls)
len := len(ls.String())
if maxLabelsLen < len {
maxLabelsLen = len
}
}
......@@ -64,14 +64,10 @@ func doQuery() {
labels := ""
if !*noLabels {
labels = color.RedString(padLabel(ls, maxLabelsLen))
labels = padLabel(ls, maxLabelsLen)
}
fmt.Println(
color.BlueString(i.Entry().Timestamp.Format(time.RFC3339)),
labels,
strings.TrimSpace(i.Entry().Line),
)
printLogEntry(i.Entry().Timestamp, labels, i.Entry().Line)
}
if err := i.Error(); err != nil {
......@@ -79,6 +75,14 @@ func doQuery() {
}
}
func printLogEntry(ts time.Time, lbls string, line string) {
fmt.Println(
color.BlueString(ts.Format(time.RFC3339)),
color.RedString(lbls),
strings.TrimSpace(line),
)
}
func padLabel(ls labels.Labels, maxLabelsLen int) string {
labels := ls.String()
if len(labels) < maxLabelsLen {
......
package main
import (
"time"
"log"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
const tailIteratorIncrement = 10 * time.Second
func tailQuery() iter.EntryIterator {
return &tailIterator{
from: time.Now().Add(-tailIteratorIncrement),
func tailQuery() {
conn, err := liveTailQueryConn()
if err != nil {
log.Fatalf("Tailing logs failed: %+v", err)
}
}
type tailIterator struct {
from time.Time
err error
iter.EntryIterator
}
func (t *tailIterator) Next() bool {
for t.EntryIterator == nil || !t.EntryIterator.Next() {
through, now := t.from.Add(tailIteratorIncrement), time.Now()
if through.After(now) {
time.Sleep(through.Sub(now))
}
stream := new(logproto.Stream)
resp, err := query(t.from, through, logproto.FORWARD)
for {
err := conn.ReadJSON(stream)
if err != nil {
t.err = err
return false
log.Println("Error reading stream:", err)
return
}
// We store the through time such that if we don't see any entries, we will
// still make forward progress. This is overwritten by any entries we might
// see to ensure pagination works.
t.from = through
t.EntryIterator = iter.NewQueryResponseIterator(resp, logproto.FORWARD)
labels := ""
if !*noLabels {
labels = stream.Labels
}
for _, entry := range stream.Entries {
printLogEntry(entry.Timestamp, labels, entry.Line)
}
}
return true
}
func (t *tailIterator) Entry() logproto.Entry {
entry := t.EntryIterator.Entry()
t.from = entry.Timestamp.Add(1 * time.Nanosecond)
return entry
}
func (t *tailIterator) Error() error {
return t.err
}
......@@ -131,6 +131,7 @@ func (t *Loki) initQuerier() (err error) {
t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.QueryHandler)))
t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/api/prom/tail", httpMiddleware.Wrap(http.HandlerFunc(t.querier.TailHandler)))
return
}
......
......@@ -9,9 +9,14 @@ import (
"strings"
"time"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/httpgrpc"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/grafana/loki/pkg/logproto"
)
......@@ -58,46 +63,48 @@ func directionParam(values url.Values, name string, def logproto.Direction) (log
return logproto.Direction(d), nil
}
// QueryHandler is a http.HandlerFunc for queries.
func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
query := params.Get("query")
func httpRequestToQueryRequest(httpRequest *http.Request) (*logproto.QueryRequest, error) {
params := httpRequest.URL.Query()
now := time.Now()
queryRequest := logproto.QueryRequest{
Regex: params.Get("regexp"),
Query: params.Get("query"),
}
limit, err := intParam(params, "limit", defaultQueryLimit)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryRequest.Limit = uint32(limit)
now := time.Now()
start, err := unixNanoTimeParam(params, "start", now.Add(-defaulSince))
queryRequest.Start, err = unixNanoTimeParam(params, "start", now.Add(-defaulSince))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
end, err := unixNanoTimeParam(params, "end", now)
queryRequest.End, err = unixNanoTimeParam(params, "end", now)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
direction, err := directionParam(params, "direction", logproto.BACKWARD)
queryRequest.Direction, err = directionParam(params, "direction", logproto.BACKWARD)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
request := logproto.QueryRequest{
Query: query,
Limit: uint32(limit),
Start: start,
End: end,
Direction: direction,
Regex: params.Get("regexp"),
return &queryRequest, nil
}
// QueryHandler is a http.HandlerFunc for queries.
func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
request, err := httpRequestToQueryRequest(r)
if err != nil {
server.WriteError(w, err)
return
}
level.Debug(util.Logger).Log("request", fmt.Sprintf("%+v", request))
result, err := q.Query(r.Context(), &request)
result, err := q.Query(r.Context(), request)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
......@@ -126,3 +133,54 @@ func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) {
return
}
}
// TailHandler is a http.HandlerFunc for handling tail queries.
func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
queryRequestPtr, err := httpRequestToQueryRequest(r)
if err != nil {
server.WriteError(w, err)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err))
return
}
defer func() {
err := conn.Close()
level.Error(util.Logger).Log("Error closing websocket", fmt.Sprintf("%v", err))
}()
// response from httpRequestToQueryRequest is a ptr, if we keep passing pointer down the call then it would stay on
// heap until connection to websocket stays open
queryRequest := *queryRequestPtr
itr := q.tailQuery(r.Context(), &queryRequest)
stream := logproto.Stream{}
for itr.Next() {
stream.Entries = []logproto.Entry{itr.Entry()}
stream.Labels = itr.Labels()
err := conn.WriteJSON(stream)
if err != nil {
level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err))
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
}
break
}
}
if err := itr.Error(); err != nil {
level.Error(util.Logger).Log("Error from iterator", fmt.Sprintf("%v", err))
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
}
}
}
package querier
import (
"context"
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
// tailIteratorIncrement is for defining size of time window we want to query entries for
const tailIteratorIncrement = 10 * time.Second
// delayQuerying is for delaying querying of logs for specified seconds to not miss any late entries
const delayQuerying = 10 * time.Second
func (q *Querier) tailQuery(ctx context.Context, queryRequest *logproto.QueryRequest) iter.EntryIterator {
queryRequest.Start = time.Now().Add(-(tailIteratorIncrement + delayQuerying))
queryRequest.Direction = logproto.FORWARD
return &tailIterator{
queryRequest: queryRequest,
querier: q,
ctx: ctx,
}
}
type tailIterator struct {
queryRequest *logproto.QueryRequest
ctx context.Context
querier *Querier
entryIterator iter.EntryIterator
err error
}
func (t *tailIterator) Next() bool {
var err error
var now time.Time
for t.entryIterator == nil || !t.entryIterator.Next() {
t.queryRequest.End, now = t.queryRequest.Start.Add(tailIteratorIncrement), time.Now()
if t.queryRequest.End.After(now.Add(-delayQuerying)) {
time.Sleep(t.queryRequest.End.Sub(now.Add(-delayQuerying)))
}
t.entryIterator, err = t.query()
if err != nil {
t.err = err
return false
}
// We store the through time such that if we don't see any entries, we will
// still make forward progress. This is overwritten by any entries we might
// see to ensure pagination works.
t.queryRequest.Start = t.queryRequest.End
}
return true
}
func (t *tailIterator) Entry() logproto.Entry {
entry := t.entryIterator.Entry()
t.queryRequest.Start = entry.Timestamp.Add(1 * time.Nanosecond)
return entry
}
func (t *tailIterator) Error() error {
return t.err
}
func (t *tailIterator) Labels() string {
return t.entryIterator.Labels()
}
func (t *tailIterator) Close() error {
return t.entryIterator.Close()
}
func (t *tailIterator) query() (iter.EntryIterator, error) {
ingesterIterators, err := t.querier.queryIngesters(t.ctx, t.queryRequest)
if err != nil {
return nil, err
}
chunkStoreIterators, err := t.querier.queryStore(t.ctx, t.queryRequest)
if err != nil {
return nil, err
}
iterators := append(chunkStoreIterators, ingesterIterators...)
return iter.NewHeapIterator(iterators, t.queryRequest.Direction), nil
}
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
.idea/
*.iml
language: go
sudo: false
matrix:
include:
- go: 1.7.x
- go: 1.8.x
- go: 1.9.x
- go: 1.10.x
- go: 1.11.x
- go: tip
allow_failures:
- go: tip
script:
- go get -t -v ./...
- diff -u <(echo -n) <(gofmt -d .)
- go vet $(go list ./... | grep -v /vendor/)
- go test -v -race ./...
# This is the official list of Gorilla WebSocket authors for copyright
# purposes.
#
# Please keep the list sorted.
Gary Burd <gary@beagledreams.com>
Google LLC (https://opensource.google.com/)
Joachim Bauch <mail@joachim-bauch.de>
Copyright (c) 2013 The Gorilla WebSocket Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Gorilla WebSocket
Gorilla WebSocket is a [Go](http://golang.org/) implementation of the
[WebSocket](http://www.rfc-editor.org/rfc/rfc6455.txt) protocol.
[![Build Status](https://travis-ci.org/gorilla/websocket.svg?branch=master)](https://travis-ci.org/gorilla/websocket)
[![GoDoc](https://godoc.org/github.com/gorilla/websocket?status.svg)](https://godoc.org/github.com/gorilla/websocket)
### Documentation
* [API Reference](http://godoc.org/github.com/gorilla/websocket)
* [Chat example](https://github.com/gorilla/websocket/tree/master/examples/chat)
* [Command example](https://github.com/gorilla/websocket/tree/master/examples/command)
* [Client and server example](https://github.com/gorilla/websocket/tree/master/examples/echo)
* [File watch example](https://github.com/gorilla/websocket/tree/master/examples/filewatch)
### Status
The Gorilla WebSocket package provides a complete and tested implementation of
the [WebSocket](http://www.rfc-editor.org/rfc/rfc6455.txt) protocol. The
package API is stable.
### Installation
go get github.com/gorilla/websocket
### Protocol Compliance
The Gorilla WebSocket package passes the server tests in the [Autobahn Test
Suite](http://autobahn.ws/testsuite) using the application in the [examples/autobahn
subdirectory](https://github.com/gorilla/websocket/tree/master/examples/autobahn).
### Gorilla WebSocket compared with other packages
<table>
<tr>
<th></th>
<th><a href="http://godoc.org/github.com/gorilla/websocket">github.com/gorilla</a></th>
<th><a href="http://godoc.org/golang.org/x/net/websocket">golang.org/x/net</a></th>
</tr>
<tr>
<tr><td colspan="3"><a href="http://tools.ietf.org/html/rfc6455">RFC 6455</a> Features</td></tr>
<tr><td>Passes <a href="http://autobahn.ws/testsuite/">Autobahn Test Suite</a></td><td><a href="https://github.com/gorilla/websocket/tree/master/examples/autobahn">Yes</a></td><td>No</td></tr>
<tr><td>Receive <a href="https://tools.ietf.org/html/rfc6455#section-5.4">fragmented</a> message<td>Yes</td><td><a href="https://code.google.com/p/go/issues/detail?id=7632">No</a>, see note 1</td></tr>
<tr><td>Send <a href="https://tools.ietf.org/html/rfc6455#section-5.5.1">close</a> message</td><td><a href="http://godoc.org/github.com/gorilla/websocket#hdr-Control_Messages">Yes</a></td><td><a href="https://code.google.com/p/go/issues/detail?id=4588">No</a></td></tr>
<tr><td>Send <a href="https://tools.ietf.org/html/rfc6455#section-5.5.2">pings</a> and receive <a href="https://tools.ietf.org/html/rfc6455#section-5.5.3">pongs</a></td><td><a href="http://godoc.org/github.com/gorilla/websocket#hdr-Control_Messages">Yes</a></td><td>No</td></tr>
<tr><td>Get the <a href="https://tools.ietf.org/html/rfc6455#section-5.6">type</a> of a received data message</td><td>Yes</td><td>Yes, see note 2</td></tr>
<tr><td colspan="3">Other Features</tr></td>
<tr><td><a href="https://tools.ietf.org/html/rfc7692">Compression Extensions</a></td><td>Experimental</td><td>No</td></tr>
<tr><td>Read message using io.Reader</td><td><a href="http://godoc.org/github.com/gorilla/websocket#Conn.NextReader">Yes</a></td><td>No, see note 3</td></tr>
<tr><td>Write message using io.WriteCloser</td><td><a href="http://godoc.org/github.com/gorilla/websocket#Conn.NextWriter">Yes</a></td><td>No, see note 3</td></tr>
</table>
Notes:
1. Large messages are fragmented in [Chrome's new WebSocket implementation](http://www.ietf.org/mail-archive/web/hybi/current/msg10503.html).
2. The application can get the type of a received data message by implementing
a [Codec marshal](http://godoc.org/golang.org/x/net/websocket#Codec.Marshal)
function.
3. The go.net io.Reader and io.Writer operate across WebSocket frame boundaries.
Read returns when the input buffer is full or a frame boundary is
encountered. Each call to Write sends a single frame message. The Gorilla
io.Reader and io.WriteCloser operate on a single WebSocket message.
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package websocket
import (
"bytes"
"context"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptrace"
"net/url"
"strings"
"time"
)
// ErrBadHandshake is returned when the server response to opening handshake is
// invalid.
var ErrBadHandshake = errors.New("websocket: bad handshake")
var errInvalidCompression = errors.New("websocket: invalid compression negotiation")
// NewClient creates a new client connection using the given net connection.
// The URL u specifies the host and request URI. Use requestHeader to specify
// the origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies
// (Cookie). Use the response.Header to get the selected subprotocol
// (Sec-WebSocket-Protocol) and cookies (Set-Cookie).
//
// If the WebSocket handshake fails, ErrBadHandshake is returned along with a
// non-nil *http.Response so that callers can handle redirects, authentication,
// etc.
//
// Deprecated: Use Dialer instead.
func NewClient(netConn net.Conn, u *url.URL, requestHeader http.Header, readBufSize, writeBufSize int) (c *Conn, response *http.Response, err error) {
d := Dialer{
ReadBufferSize: readBufSize,
WriteBufferSize: writeBufSize,
NetDial: func(net, addr string) (net.Conn, error) {
return netConn, nil
},
}
return d.Dial(u.String(), requestHeader)
}
// A Dialer contains options for connecting to WebSocket server.
type Dialer struct {
// NetDial specifies the dial function for creating TCP connections. If
// NetDial is nil, net.Dial is used.
NetDial func(network, addr string) (net.Conn, error)
// NetDialContext specifies the dial function for creating TCP connections. If
// NetDialContext is nil, net.DialContext is used.
NetDialContext func(ctx context.Context, network, addr string) (net.Conn, error)
// Proxy specifies a function to return a proxy for a given
// Request. If the function returns a non-nil error, the
// request is aborted with the provided error.
// If Proxy is nil or returns a nil *URL, no proxy is used.
Proxy func(*http.Request) (*url.URL, error)
// TLSClientConfig specifies the TLS configuration to use with tls.Client.
// If nil, the default configuration is used.
TLSClientConfig *tls.Config
// HandshakeTimeout specifies the duration for the handshake to complete.
HandshakeTimeout time.Duration
// ReadBufferSize and WriteBufferSize specify I/O buffer sizes. If a buffer
// size is zero, then a useful default size is used. The I/O buffer sizes
// do not limit the size of the messages that can be sent or received.
ReadBufferSize, WriteBufferSize int
// WriteBufferPool is a pool of buffers for write operations. If the value
// is not set, then write buffers are allocated to the connection for the
// lifetime of the connection.
//
// A pool is most useful when the application has a modest volume of writes
// across a large number of connections.
//
// Applications should use a single pool for each unique value of
// WriteBufferSize.
WriteBufferPool BufferPool
// Subprotocols specifies the client's requested subprotocols.
Subprotocols []string
// EnableCompression specifies if the client should attempt to negotiate
// per message compression (RFC 7692). Setting this value to true does not
// guarantee that compression will be supported. Currently only "no context
// takeover" modes are supported.
EnableCompression bool
// Jar specifies the cookie jar.
// If Jar is nil, cookies are not sent in requests and ignored
// in responses.
Jar http.CookieJar
}
// Dial creates a new client connection by calling DialContext with a background context.
func (d *Dialer) Dial(urlStr string, requestHeader http.Header) (*Conn, *http.Response, error) {
return d.DialContext(context.Background(), urlStr, requestHeader)
}
var errMalformedURL = errors.New("malformed ws or wss URL")
func hostPortNoPort(u *url.URL) (hostPort, hostNoPort string) {
hostPort = u.Host
hostNoPort = u.Host
if i := strings.LastIndex(u.Host, ":"); i > strings.LastIndex(u.Host, "]") {
hostNoPort = hostNoPort[:i]
} else {
switch u.Scheme {
case "wss":
hostPort += ":443"
case "https":
hostPort += ":443"
default:
hostPort += ":80"
}
}
return hostPort, hostNoPort
}
// DefaultDialer is a dialer with all fields set to the default values.
var DefaultDialer = &Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
}
// nilDialer is dialer to use when receiver is nil.
var nilDialer = *DefaultDialer
// DialContext creates a new client connection. Use requestHeader to specify the
// origin (Origin), subprotocols (Sec-WebSocket-Protocol) and cookies (Cookie).
// Use the response.Header to get the selected subprotocol
// (Sec-WebSocket-Protocol) and cookies (Set-Cookie).
//
// The context will be used in the request and in the Dialer
//
// If the WebSocket handshake fails, ErrBadHandshake is returned along with a
// non-nil *http.Response so that callers can handle redirects, authentication,
// etcetera. The response body may not contain the entire response and does not
// need to be closed by the application.
func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader http.Header) (*Conn, *http.Response, error) {
if d == nil {
d = &nilDialer
}
challengeKey, err := generateChallengeKey()
if err != nil {
return nil, nil, err
}
u, err := url.Parse(urlStr)
if err != nil {
return nil, nil, err
}
switch u.Scheme {
case "ws":
u.Scheme = "http"
case "wss":
u.Scheme = "https"
default:
return nil, nil, errMalformedURL
}
if u.User != nil {
// User name and password are not allowed in websocket URIs.
return nil, nil, errMalformedURL
}
req := &http.Request{
Method: "GET",
URL: u,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
Host: u.Host,
}
req = req.WithContext(ctx)
// Set the cookies present in the cookie jar of the dialer
if d.Jar != nil {
for _, cookie := range d.Jar.Cookies(u) {
req.AddCookie(cookie)
}
}
// Set the request headers using the capitalization for names and values in
// RFC examples. Although the capitalization shouldn't matter, there are
// servers that depend on it. The Header.Set method is not used because the
// method canonicalizes the header names.
req.Header["Upgrade"] = []string{"websocket"}
req.Header["Connection"] = []string{"Upgrade"}
req.Header["Sec-WebSocket-Key"] = []string{challengeKey}
req.Header["Sec-WebSocket-Version"] = []string{"13"}
if len(d.Subprotocols) > 0 {
req.Header["Sec-WebSocket-Protocol"] = []string{strings.Join(d.Subprotocols, ", ")}
}
for k, vs := range requestHeader {
switch {
case k == "Host":
if len(vs) > 0 {
req.Host = vs[0]
}
case k == "Upgrade" ||
k == "Connection" ||
k == "Sec-Websocket-Key" ||
k == "Sec-Websocket-Version" ||
k == "Sec-Websocket-Extensions" ||
(k == "Sec-Websocket-Protocol" && len(d.Subprotocols) > 0):
return nil, nil, errors.New("websocket: duplicate header not allowed: " + k)
case k == "Sec-Websocket-Protocol":
req.Header["Sec-WebSocket-Protocol"] = vs
default:
req.Header[k] = vs
}
}
if d.EnableCompression {
req.Header["Sec-WebSocket-Extensions"] = []string{"permessage-deflate; server_no_context_takeover; client_no_context_takeover"}
}
if d.HandshakeTimeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, d.HandshakeTimeout)
defer cancel()
}
// Get network dial function.
var netDial func(network, add string) (net.Conn, error)
if d.NetDialContext != nil {
netDial = func(network, addr string) (net.Conn, error) {
return d.NetDialContext(ctx, network, addr)
}
} else if d.NetDial != nil {
netDial = d.NetDial
} else {
netDialer := &net.Dialer{}
netDial = func(network, addr string) (net.Conn, error) {
return netDialer.DialContext(ctx, network, addr)
}
}
// If needed, wrap the dial function to set the connection deadline.
if deadline, ok := ctx.Deadline(); ok {
forwardDial := netDial
netDial = func(network, addr string) (net.Conn, error) {
c, err := forwardDial(network, addr)
if err != nil {
return nil, err
}
err = c.SetDeadline(deadline)
if err != nil {
c.Close()
return nil, err
}
return c, nil
}
}
// If needed, wrap the dial function to connect through a proxy.
if d.Proxy != nil {
proxyURL, err := d.Proxy(req)
if err != nil {
return nil, nil, err
}
if proxyURL != nil {
dialer, err := proxy_FromURL(proxyURL, netDialerFunc(netDial))
if err != nil {
return nil, nil, err
}
netDial = dialer.Dial
}
}
hostPort, hostNoPort := hostPortNoPort(u)
trace := httptrace.ContextClientTrace(ctx)
if trace != nil && trace.GetConn != nil {
trace.GetConn(hostPort)
}
netConn, err := netDial("tcp", hostPort)
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{
Conn: netConn,
})
}
if err != nil {
return nil, nil, err
}
defer func() {
if netConn != nil {
netConn.Close()
}
}()
if u.Scheme == "https" {
cfg := cloneTLSConfig(d.TLSClientConfig)
if cfg.ServerName == "" {
cfg.ServerName = hostNoPort
}
tlsConn := tls.Client(netConn, cfg)
netConn = tlsConn
var err error
if trace != nil {
err = doHandshakeWithTrace(trace, tlsConn, cfg)
} else {
err = doHandshake(tlsConn, cfg)
}
if err != nil {
return nil, nil, err
}
}
conn := newConn(netConn, false, d.ReadBufferSize, d.WriteBufferSize, d.WriteBufferPool, nil, nil)
if err := req.Write(netConn); err != nil {
return nil, nil, err
}
if trace != nil && trace.GotFirstResponseByte != nil {
if peek, err := conn.br.Peek(1); err == nil && len(peek) == 1 {
trace.GotFirstResponseByte()
}
}
resp, err := http.ReadResponse(conn.br, req)
if err != nil {
return nil, nil, err
}
if d.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
d.Jar.SetCookies(u, rc)
}
}
if resp.StatusCode != 101 ||
!strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
!strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
resp.Header.Get("Sec-Websocket-Accept") != computeAcceptKey(challengeKey) {
// Before closing the network connection on return from this
// function, slurp up some of the response to aid application
// debugging.
buf := make([]byte, 1024)
n, _ := io.ReadFull(resp.Body, buf)
resp.Body = ioutil.NopCloser(bytes.NewReader(buf[:n]))
return nil, resp, ErrBadHandshake
}
for _, ext := range parseExtensions(resp.Header) {
if ext[""] != "permessage-deflate" {
continue
}
_, snct := ext["server_no_context_takeover"]
_, cnct := ext["client_no_context_takeover"]
if !snct || !cnct {
return nil, resp, errInvalidCompression
}
conn.newCompressionWriter = compressNoContextTakeover
conn.newDecompressionReader = decompressNoContextTakeover
break
}
resp.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
conn.subprotocol = resp.Header.Get("Sec-Websocket-Protocol")
netConn.SetDeadline(time.Time{})
netConn = nil // to avoid close in defer.
return conn, resp, nil
}
func doHandshake(tlsConn *tls.Conn, cfg *tls.Config) error {
if err := tlsConn.Handshake(); err != nil {
return err
}
if !cfg.InsecureSkipVerify {
if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
return err
}
}
return nil
}
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.8
package websocket
import "crypto/tls"
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
}
return cfg.Clone()
}
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.8
package websocket
import "crypto/tls"
// cloneTLSConfig clones all public fields except the fields
// SessionTicketsDisabled and SessionTicketKey. This avoids copying the
// sync.Mutex in the sync.Once and makes it safe to call cloneTLSConfig on a
// config in active use.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
}
return &tls.Config{
Rand: cfg.Rand,
Time: cfg.Time,
Certificates: cfg.Certificates,
NameToCertificate: cfg.NameToCertificate,
GetCertificate: cfg.GetCertificate,
RootCAs: cfg.RootCAs,
NextProtos: cfg.NextProtos,
ServerName: cfg.ServerName,
ClientAuth: cfg.ClientAuth,
ClientCAs: cfg.ClientCAs,
InsecureSkipVerify: cfg.InsecureSkipVerify,
CipherSuites: cfg.CipherSuites,
PreferServerCipherSuites: cfg.PreferServerCipherSuites,
ClientSessionCache: cfg.ClientSessionCache,
MinVersion: cfg.MinVersion,
MaxVersion: cfg.MaxVersion,
CurvePreferences: cfg.CurvePreferences,
}
}
// Copyright 2017 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package websocket
import (
"compress/flate"
"errors"
"io"
"strings"
"sync"
)
const (
minCompressionLevel = -2 // flate.HuffmanOnly not defined in Go < 1.6
maxCompressionLevel = flate.BestCompression
defaultCompressionLevel = 1
)
var (
flateWriterPools [maxCompressionLevel - minCompressionLevel + 1]sync.Pool
flateReaderPool = sync.Pool{New: func() interface{} {
return flate.NewReader(nil)
}}
)
func decompressNoContextTakeover(r io.Reader) io.ReadCloser {
const tail =
// Add four bytes as specified in RFC
"\x00\x00\xff\xff" +
// Add final block to squelch unexpected EOF error from flate reader.
"\x01\x00\x00\xff\xff"
fr, _ := flateReaderPool.Get().(io.ReadCloser)
fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
return &flateReadWrapper{fr}
}
func isValidCompressionLevel(level int) bool {
return minCompressionLevel <= level && level <= maxCompressionLevel
}
func compressNoContextTakeover(w io.WriteCloser, level int) io.WriteCloser {
p := &flateWriterPools[level-minCompressionLevel]
tw := &truncWriter{w: w}
fw, _ := p.Get().(*flate.Writer)
if fw == nil {
fw, _ = flate.NewWriter(tw, level)
} else {
fw.Reset(tw)
}
return &flateWriteWrapper{fw: fw, tw: tw, p: p}
}
// truncWriter is an io.Writer that writes all but the last four bytes of the
// stream to another io.Writer.
type truncWriter struct {
w io.WriteCloser
n int
p [4]byte
}
func (w *truncWriter) Write(p []byte) (int, error) {
n := 0
// fill buffer first for simplicity.
if w.n < len(w.p) {
n = copy(w.p[w.n:], p)
p = p[n:]
w.n += n
if len(p) == 0 {
return n, nil
}
}
m := len(p)
if m > len(w.p) {
m = len(w.p)
}
if nn, err := w.w.Write(w.p[:m]); err != nil {
return n + nn, err
}
copy(w.p[:], w.p[m:])
copy(w.p[len(w.p)-m:], p[len(p)-m:])
nn, err := w.w.Write(p[:len(p)-m])
return n + nn, err
}
type flateWriteWrapper struct {
fw *flate.Writer
tw *truncWriter
p *sync.Pool
}
func (w *flateWriteWrapper) Write(p []byte) (int, error) {
if w.fw == nil {
return 0, errWriteClosed
}
return w.fw.Write(p)
}
func (w *flateWriteWrapper) Close() error {
if w.fw == nil {
return errWriteClosed
}
err1 := w.fw.Flush()
w.p.Put(w.fw)
w.fw = nil
if w.tw.p != [4]byte{0, 0, 0xff, 0xff} {
return errors.New("websocket: internal error, unexpected bytes at end of flate stream")
}
err2 := w.tw.w.Close()
if err1 != nil {
return err1
}
return err2
}
type flateReadWrapper struct {
fr io.ReadCloser
}
func (r *flateReadWrapper) Read(p []byte) (int, error) {
if r.fr == nil {
return 0, io.ErrClosedPipe
}
n, err := r.fr.Read(p)
if err == io.EOF {
// Preemptively place the reader back in the pool. This helps with
// scenarios where the application does not call NextReader() soon after
// this final read.
r.Close()
}
return n, err
}
func (r *flateReadWrapper) Close() error {
if r.fr == nil {
return io.ErrClosedPipe
}
err := r.fr.Close()
flateReaderPool.Put(r.fr)
r.fr = nil
return err
}
This diff is collapsed.
// Copyright 2016 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.8
package websocket
import "net"
func (c *Conn) writeBufs(bufs ...[]byte) error {
b := net.Buffers(bufs)
_, err := b.WriteTo(c.conn)
return err
}
// Copyright 2016 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.8
package websocket
func (c *Conn) writeBufs(bufs ...[]byte) error {
for _, buf := range bufs {
if len(buf) > 0 {
if _, err := c.conn.Write(buf); err != nil {
return err
}
}
}
return nil
}
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package websocket implements the WebSocket protocol defined in RFC 6455.
//
// Overview
//
// The Conn type represents a WebSocket connection. A server application calls
// the Upgrader.Upgrade method from an HTTP request handler to get a *Conn:
//
// var upgrader = websocket.Upgrader{
// ReadBufferSize: 1024,
// WriteBufferSize: 1024,
// }
//
// func handler(w http.ResponseWriter, r *http.Request) {
// conn, err := upgrader.Upgrade(w, r, nil)
// if err != nil {
// log.Println(err)
// return
// }
// ... Use conn to send and receive messages.
// }
//
// Call the connection's WriteMessage and ReadMessage methods to send and
// receive messages as a slice of bytes. This snippet of code shows how to echo
// messages using these methods:
//
// for {
// messageType, p, err := conn.ReadMessage()
// if err != nil {
// log.Println(err)
// return
// }
// if err := conn.WriteMessage(messageType, p); err != nil {
// log.Println(err)
// return
// }
// }
//
// In above snippet of code, p is a []byte and messageType is an int with value
// websocket.BinaryMessage or websocket.TextMessage.
//
// An application can also send and receive messages using the io.WriteCloser
// and io.Reader interfaces. To send a message, call the connection NextWriter
// method to get an io.WriteCloser, write the message to the writer and close
// the writer when done. To receive a message, call the connection NextReader
// method to get an io.Reader and read until io.EOF is returned. This snippet
// shows how to echo messages using the NextWriter and NextReader methods:
//
// for {
// messageType, r, err := conn.NextReader()
// if err != nil {
// return
// }
// w, err := conn.NextWriter(messageType)
// if err != nil {
// return err
// }
// if _, err := io.Copy(w, r); err != nil {
// return err
// }
// if err := w.Close(); err != nil {
// return err
// }
// }
//
// Data Messages
//
// The WebSocket protocol distinguishes between text and binary data messages.
// Text messages are interpreted as UTF-8 encoded text. The interpretation of
// binary messages is left to the application.
//
// This package uses the TextMessage and BinaryMessage integer constants to
// identify the two data message types. The ReadMessage and NextReader methods
// return the type of the received message. The messageType argument to the
// WriteMessage and NextWriter methods specifies the type of a sent message.
//
// It is the application's responsibility to ensure that text messages are
// valid UTF-8 encoded text.
//
// Control Messages
//
// The WebSocket protocol defines three types of control messages: close, ping
// and pong. Call the connection WriteControl, WriteMessage or NextWriter
// methods to send a control message to the peer.
//
// Connections handle received close messages by calling the handler function
// set with the SetCloseHandler method and by returning a *CloseError from the
// NextReader, ReadMessage or the message Read method. The default close
// handler sends a close message to the peer.
//
// Connections handle received ping messages by calling the handler function
// set with the SetPingHandler method. The default ping handler sends a pong
// message to the peer.
//
// Connections handle received pong messages by calling the handler function
// set with the SetPongHandler method. The default pong handler does nothing.
// If an application sends ping messages, then the application should set a
// pong handler to receive the corresponding pong.
//
// The control message handler functions are called from the NextReader,
// ReadMessage and message reader Read methods. The default close and ping
// handlers can block these methods for a short time when the handler writes to
// the connection.
//
// The application must read the connection to process close, ping and pong
// messages sent from the peer. If the application is not otherwise interested
// in messages from the peer, then the application should start a goroutine to
// read and discard messages from the peer. A simple example is:
//
// func readLoop(c *websocket.Conn) {
// for {
// if _, _, err := c.NextReader(); err != nil {
// c.Close()
// break
// }
// }
// }
//
// Concurrency
//
// Connections support one concurrent reader and one concurrent writer.
//
// Applications are responsible for ensuring that no more than one goroutine
// calls the write methods (NextWriter, SetWriteDeadline, WriteMessage,
// WriteJSON, EnableWriteCompression, SetCompressionLevel) concurrently and
// that no more than one goroutine calls the read methods (NextReader,
// SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler)
// concurrently.
//
// The Close and WriteControl methods can be called concurrently with all other
// methods.
//
// Origin Considerations
//
// Web browsers allow Javascript applications to open a WebSocket connection to
// any host. It's up to the server to enforce an origin policy using the Origin
// request header sent by the browser.
//
// The Upgrader calls the function specified in the CheckOrigin field to check
// the origin. If the CheckOrigin function returns false, then the Upgrade
// method fails the WebSocket handshake with HTTP status 403.
//
// If the CheckOrigin field is nil, then the Upgrader uses a safe default: fail
// the handshake if the Origin request header is present and the Origin host is
// not equal to the Host request header.
//
// The deprecated package-level Upgrade function does not perform origin
// checking. The application is responsible for checking the Origin header
// before calling the Upgrade function.
//
// Compression EXPERIMENTAL
//
// Per message compression extensions (RFC 7692) are experimentally supported
// by this package in a limited capacity. Setting the EnableCompression option
// to true in Dialer or Upgrader will attempt to negotiate per message deflate
// support.
//
// var upgrader = websocket.Upgrader{
// EnableCompression: true,
// }
//
// If compression was successfully negotiated with the connection's peer, any
// message received in compressed form will be automatically decompressed.
// All Read methods will return uncompressed bytes.
//
// Per message compression of messages written to a connection can be enabled
// or disabled by calling the corresponding Conn method:
//
// conn.EnableWriteCompression(false)
//
// Currently this package does not support compression with "context takeover".
// This means that messages must be compressed and decompressed in isolation,
// without retaining sliding window or dictionary state across messages. For
// more details refer to RFC 7692.
//
// Use of compression is experimental and may result in decreased performance.
package websocket
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment