From 7642538d396c93d5fd2a42c6d9de43b3465747d4 Mon Sep 17 00:00:00 2001 From: Oleksandr Pryimak Date: Wed, 13 May 2020 00:44:50 -0700 Subject: [PATCH] Use Pool to store and reuse GZIPWriters GZIPWriter are very expensive to create and leads to allocating a lot of memory. They are perfect for reusing (they have .reset method for this) --- client.go | 20 +++++++++++++++++++- request.go | 14 +++++++------- request_test.go | 18 +++++++++++------- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/client.go b/client.go index 95ea6283c..d9f981288 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ package elastic import ( "bytes" "context" + "compress/gzip" "encoding/json" "fmt" "log" @@ -102,6 +103,12 @@ var ( noDeprecationLog = func(*http.Request, *http.Response) {} ) +var gzipWriterPool = sync.Pool { + New: func()interface{} { + return gzip.NewWriter(new(bytes.Buffer)) + }, +} + // ClientOptionFunc is a function that configures a Client. // It is used in NewClient. type ClientOptionFunc func(*Client) error @@ -1347,7 +1354,18 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) // Set body if opt.Body != nil { - err = req.SetBody(opt.Body, gzipEnabled) + var gzipWriter *gzip.Writer; + if gzipEnabled { + item := gzipWriterPool.Get() + if item == nil { + c.infof("Got nothing from the pool. Creating a new gzip compressor...") + gzipWriter = gzip.NewWriter(new(bytes.Buffer)) + } else { + gzipWriter = item.(*gzip.Writer) + } + defer gzipWriterPool.Put(gzipWriter) + } + err = req.SetBody(opt.Body, gzipEnabled, gzipWriter) if err != nil { c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err) return nil, err diff --git a/request.go b/request.go index 288c837ab..9b138dd19 100644 --- a/request.go +++ b/request.go @@ -37,16 +37,16 @@ func (r *Request) SetBasicAuth(username, password string) { // SetBody encodes the body in the request. You may pass a flag to // compress the request via gzip. -func (r *Request) SetBody(body interface{}, gzipCompress bool) error { +func (r *Request) SetBody(body interface{}, gzipCompress bool, writer *gzip.Writer) error { switch b := body.(type) { case string: if gzipCompress { - return r.setBodyGzip(b) + return r.setBodyGzip(b, writer) } return r.setBodyString(b) default: if gzipCompress { - return r.setBodyGzip(body) + return r.setBodyGzip(body, writer) } return r.setBodyJson(body) } @@ -70,7 +70,7 @@ func (r *Request) setBodyString(body string) error { // setBodyGzip gzip's the body. It accepts both strings and structs as body. // The latter will be encoded via json.Marshal. -func (r *Request) setBodyGzip(body interface{}) error { +func (r *Request) setBodyGzip(body interface{}, writer *gzip.Writer) error { switch b := body.(type) { case string: buf := new(bytes.Buffer) @@ -90,11 +90,11 @@ func (r *Request) setBodyGzip(body interface{}) error { return err } buf := new(bytes.Buffer) - w := gzip.NewWriter(buf) - if _, err := w.Write(data); err != nil { + writer.Reset(buf) + if _, err := writer.Write(data); err != nil { return err } - if err := w.Close(); err != nil { + if err := writer.Close(); err != nil { return err } r.Header.Add("Content-Encoding", "gzip") diff --git a/request_test.go b/request_test.go index 66e80b991..7b7e25ac4 100644 --- a/request_test.go +++ b/request_test.go @@ -4,7 +4,11 @@ package elastic -import "testing" +import ( + "bytes" + "compress/gzip" + "testing" +) var testReq *Request // used as a temporary variable to avoid compiler optimizations in tests/benchmarks @@ -29,7 +33,7 @@ func BenchmarkRequestSetBodyString(b *testing.B) { } for i := 0; i < b.N; i++ { body := `{"query":{"match_all":{}}}` - err = req.SetBody(body, false) + err = req.SetBody(body, false, nil) if err != nil { b.Fatal(err) } @@ -45,7 +49,7 @@ func BenchmarkRequestSetBodyStringGzip(b *testing.B) { } for i := 0; i < b.N; i++ { body := `{"query":{"match_all":{}}}` - err = req.SetBody(body, true) + err = req.SetBody(body, true, gzip.NewWriter(new(bytes.Buffer))) if err != nil { b.Fatal(err) } @@ -61,7 +65,7 @@ func BenchmarkRequestSetBodyBytes(b *testing.B) { } for i := 0; i < b.N; i++ { body := []byte(`{"query":{"match_all":{}}}`) - err = req.SetBody(body, false) + err = req.SetBody(body, false, nil) if err != nil { b.Fatal(err) } @@ -77,7 +81,7 @@ func BenchmarkRequestSetBodyBytesGzip(b *testing.B) { } for i := 0; i < b.N; i++ { body := []byte(`{"query":{"match_all":{}}}`) - err = req.SetBody(body, true) + err = req.SetBody(body, true, gzip.NewWriter(new(bytes.Buffer))) if err != nil { b.Fatal(err) } @@ -97,7 +101,7 @@ func BenchmarkRequestSetBodyMap(b *testing.B) { "match_all": map[string]interface{}{}, }, } - err = req.SetBody(body, false) + err = req.SetBody(body, false, nil) if err != nil { b.Fatal(err) } @@ -117,7 +121,7 @@ func BenchmarkRequestSetBodyMapGzip(b *testing.B) { "match_all": map[string]interface{}{}, }, } - err = req.SetBody(body, true) + err = req.SetBody(body, true, gzip.NewWriter(new(bytes.Buffer))) if err != nil { b.Fatal(err) }