diff --git a/docs/operations.md b/docs/operations.md
index 1103fedc6567e44250b2cf9516a21adf2f85e24f..d6807745f0c3dbb0560e1095093e4336ef842e61 100644
--- a/docs/operations.md
+++ b/docs/operations.md
@@ -39,7 +39,11 @@ Promtail metrics:
 - `promtail_read_bytes_total` Number of bytes read.
 - `promtail_read_lines_total` Number of lines read.
 - `promtail_request_duration_seconds_count` Number of send requests.
+- `promtail_encoded_bytes_total` Number of bytes encoded and ready to send.
 - `promtail_sent_bytes_total` Number of bytes sent.
+- `promtail_dropped_bytes_total` Number of bytes dropped because failed to be sent to the ingester after all retries.
+- `promtail_sent_entries_total` Number of log entries sent to the ingester.
+- `promtail_dropped_entries_total` Number of log entries dropped because failed to be sent to the ingester after all retries.
 
 Most of these metrics are counters and should continuously increase during normal operations:
 
diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go
index 0cf478aadf0b7726b1a30ee9169e5c7206669674..0684fe1f30b2f328496af055f0876b154a1792a2 100644
--- a/pkg/promtail/client/client.go
+++ b/pkg/promtail/client/client.go
@@ -40,6 +40,21 @@ var (
 		Name:      "sent_bytes_total",
 		Help:      "Number of bytes sent.",
 	}, []string{"host"})
+	droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "promtail",
+		Name:      "dropped_bytes_total",
+		Help:      "Number of bytes dropped because failed to be sent to the ingester after all retries.",
+	}, []string{"host"})
+	sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "promtail",
+		Name:      "sent_entries_total",
+		Help:      "Number of log entries sent to the ingester.",
+	}, []string{"host"})
+	droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "promtail",
+		Name:      "dropped_entries_total",
+		Help:      "Number of log entries dropped because failed to be sent to the ingester after all retries.",
+	}, []string{"host"})
 	requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
 		Namespace: "promtail",
 		Name:      "request_duration_seconds",
@@ -50,6 +65,9 @@ var (
 func init() {
 	prometheus.MustRegister(encodedBytes)
 	prometheus.MustRegister(sentBytes)
+	prometheus.MustRegister(droppedBytes)
+	prometheus.MustRegister(sentEntries)
+	prometheus.MustRegister(droppedEntries)
 	prometheus.MustRegister(requestDuration)
 }
 
@@ -153,7 +171,7 @@ func (c *client) run() {
 }
 
 func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
-	buf, err := encodeBatch(batch)
+	buf, entriesCount, err := encodeBatch(batch)
 	if err != nil {
 		level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
 		return
@@ -171,6 +189,7 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
 
 		if err == nil {
 			sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
+			sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
 			return
 		}
 
@@ -185,22 +204,28 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
 
 	if err != nil {
 		level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
+		droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
+		droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
 	}
 }
 
-func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
+func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, int, error) {
 	req := logproto.PushRequest{
 		Streams: make([]*logproto.Stream, 0, len(batch)),
 	}
+
+	entriesCount := 0
 	for _, stream := range batch {
 		req.Streams = append(req.Streams, stream)
+		entriesCount += len(stream.Entries)
 	}
+
 	buf, err := proto.Marshal(&req)
 	if err != nil {
-		return nil, err
+		return nil, 0, err
 	}
 	buf = snappy.Encode(nil, buf)
-	return buf, nil
+	return buf, entriesCount, nil
 }
 
 func (c *client) send(ctx context.Context, buf []byte) (int, error) {
diff --git a/pkg/promtail/client/client_test.go b/pkg/promtail/client/client_test.go
index 0fe6edfa67c6ddceb112f0329af9ee2ce1019426..f03e06fd7c1398f50c7641f88afe944c1f216add 100644
--- a/pkg/promtail/client/client_test.go
+++ b/pkg/promtail/client/client_test.go
@@ -3,6 +3,7 @@ package client
 import (
 	"net/http"
 	"net/http/httptest"
+	"strings"
 	"testing"
 	"time"
 
@@ -14,17 +15,21 @@ import (
 	"github.com/cortexproject/cortex/pkg/util/flagext"
 	"github.com/grafana/loki/pkg/logproto"
 	lokiflag "github.com/grafana/loki/pkg/util/flagext"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/testutil"
 	"github.com/prometheus/common/config"
 	"github.com/prometheus/common/model"
 )
 
-func TestClient_Handle(t *testing.T) {
-	logEntries := []entry{
+var (
+	logEntries = []entry{
 		{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
 		{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
 		{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
 	}
+)
 
+func TestClient_Handle(t *testing.T) {
 	tests := map[string]struct {
 		clientBatchSize      int
 		clientBatchWait      time.Duration
@@ -33,6 +38,7 @@ func TestClient_Handle(t *testing.T) {
 		inputEntries         []entry
 		inputDelay           time.Duration
 		expectedBatches      [][]*logproto.Stream
+		expectedMetrics      string
 	}{
 		"batch log entries together until the batch size is reached": {
 			clientBatchSize:      10,
@@ -48,6 +54,11 @@ func TestClient_Handle(t *testing.T) {
 					{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
 				},
 			},
+			expectedMetrics: `
+				# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
+				# TYPE promtail_sent_entries_total counter
+				promtail_sent_entries_total{host="__HOST__"} 3.0
+			`,
 		},
 		"batch log entries together until the batch wait time is reached": {
 			clientBatchSize:      10,
@@ -64,6 +75,11 @@ func TestClient_Handle(t *testing.T) {
 					{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}},
 				},
 			},
+			expectedMetrics: `
+				# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
+				# TYPE promtail_sent_entries_total counter
+				promtail_sent_entries_total{host="__HOST__"} 2.0
+			`,
 		},
 		"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
 			clientBatchSize:      10,
@@ -82,6 +98,11 @@ func TestClient_Handle(t *testing.T) {
 					{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
 				},
 			},
+			expectedMetrics: `
+				# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
+				# TYPE promtail_dropped_entries_total counter
+				promtail_dropped_entries_total{host="__HOST__"} 1.0
+			`,
 		},
 		"do not retry send a batch in case the server responds with a 4xx": {
 			clientBatchSize:      10,
@@ -94,11 +115,20 @@ func TestClient_Handle(t *testing.T) {
 					{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
 				},
 			},
+			expectedMetrics: `
+				# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
+				# TYPE promtail_dropped_entries_total counter
+				promtail_dropped_entries_total{host="__HOST__"} 1.0
+			`,
 		},
 	}
 
 	for testName, testData := range tests {
 		t.Run(testName, func(t *testing.T) {
+			// Reset metrics
+			sentEntries.Reset()
+			droppedEntries.Reset()
+
 			// Create a buffer channel where we do enqueue received requests
 			receivedReqsChan := make(chan logproto.PushRequest, 10)
 
@@ -156,6 +186,55 @@ func TestClient_Handle(t *testing.T) {
 			for i, batch := range receivedReqs {
 				assert.Equal(t, testData.expectedBatches[i], batch.Streams)
 			}
+
+			expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
+			err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
+			assert.NoError(t, err)
+		})
+	}
+}
+
+func TestClient_encodeBatch(t *testing.T) {
+	t.Parallel()
+
+	tests := map[string]struct {
+		inputBatch           map[model.Fingerprint]*logproto.Stream
+		expectedEntriesCount int
+	}{
+		"empty batch": {
+			inputBatch:           map[model.Fingerprint]*logproto.Stream{},
+			expectedEntriesCount: 0,
+		},
+		"single stream with single log entry": {
+			inputBatch: map[model.Fingerprint]*logproto.Stream{
+				model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
+			},
+			expectedEntriesCount: 1,
+		},
+		"single stream with multiple log entries": {
+			inputBatch: map[model.Fingerprint]*logproto.Stream{
+				model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
+			},
+			expectedEntriesCount: 2,
+		},
+		"multiple streams with multiple log entries": {
+			inputBatch: map[model.Fingerprint]*logproto.Stream{
+				model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
+				model.Fingerprint(2): {Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
+			},
+			expectedEntriesCount: 3,
+		},
+	}
+
+	for testName, testData := range tests {
+		testData := testData
+
+		t.Run(testName, func(t *testing.T) {
+			t.Parallel()
+
+			_, entriesCount, err := encodeBatch(testData.inputBatch)
+			require.NoError(t, err)
+			assert.Equal(t, testData.expectedEntriesCount, entriesCount)
 		})
 	}
 }