Skip to content

Commit

Permalink
Stream HTTP response body read for decompression (#1213)
Browse files Browse the repository at this point in the history
* Create reader instead of reading Response.Body to avoid huge memory consumption

* Fix test
  • Loading branch information
rogeryk authored Mar 7, 2024
1 parent 40eda3f commit 4c8219e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 81 deletions.
14 changes: 13 additions & 1 deletion clickhouse_rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (r *rows) Next() (result bool) {
}
next:
if r.row >= r.block.Rows() {
if r.stream == nil {
return false
}
select {
case err := <-r.errors:
if err != nil {
Expand Down Expand Up @@ -95,7 +98,16 @@ func (r *rows) Columns() []string {
}

func (r *rows) Close() error {
active := 2
if r.errors == nil && r.stream == nil {
return r.err
}
active := 0
if r.errors != nil {
active++
}
if r.stream != nil {
active++
}
for {
select {
case _, ok := <-r.stream:
Expand Down
66 changes: 21 additions & 45 deletions conn_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,49 +75,32 @@ type HTTPReaderWriter struct {
method CompressionMethod
}

func (rw HTTPReaderWriter) read(res *http.Response) ([]byte, error) {
// NewReader will return a reader that will decompress data if needed.
func (rw *HTTPReaderWriter) NewReader(res *http.Response) (io.Reader, error) {
enc := res.Header.Get("Content-Encoding")
if !res.Uncompressed && rw.method.String() == enc {
switch rw.method {
case CompressionGZIP:
reader := rw.reader.(*gzip.Reader)
defer reader.Close()
if err := reader.Reset(res.Body); err != nil {
return nil, err
}
body, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
return body, nil
return reader, nil
case CompressionDeflate:
reader := rw.reader.(io.ReadCloser)
defer reader.Close()
if err := rw.reader.(flate.Resetter).Reset(res.Body, nil); err != nil {
return nil, err
}
body, err := io.ReadAll(reader)
if err != nil {
reader := rw.reader
if err := reader.(flate.Resetter).Reset(res.Body, nil); err != nil {
return nil, err
}
return body, nil
return reader, nil
case CompressionBrotli:
reader := rw.reader.(*brotli.Reader)
if err := reader.Reset(res.Body); err != nil {
return nil, err
}
body, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
return body, nil
return reader, nil
}
}
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
return body, nil
return res.Body, nil
}

func (rw *HTTPReaderWriter) reset(pw *io.PipeWriter) io.WriteCloser {
Expand Down Expand Up @@ -436,27 +419,21 @@ func (h *httpConnect) sendQuery(ctx context.Context, query string, options *Quer

func (h *httpConnect) readRawResponse(response *http.Response) (body []byte, err error) {
rw := h.compressionPool.Get()
defer response.Body.Close()
defer h.compressionPool.Put(rw)
if body, err = rw.read(response); err != nil {

reader, err := rw.NewReader(response)
if err != nil {
return nil, err
}
if h.compression == CompressionLZ4 || h.compression == CompressionZSTD {
result := make([]byte, len(body))
reader := chproto.NewReader(bytes.NewReader(body))
reader.EnableCompression()
defer reader.DisableCompression()
for {
b, err := reader.ReadByte()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, err
}
result = append(result, b)
}
return result, nil
chReader := chproto.NewReader(reader)
chReader.EnableCompression()
reader = chReader
}

body, err = io.ReadAll(reader)
if err != nil {
return nil, err
}
return body, nil
}
Expand Down Expand Up @@ -549,14 +526,13 @@ func (h *httpConnect) executeRequest(req *http.Request) (*http.Response, error)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {

if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
msg, err := h.readRawResponse(resp)

if err != nil {
return nil, fmt.Errorf("clickhouse [execute]:: %d code: failed to read the response: %w", resp.StatusCode, err)
}

return nil, fmt.Errorf("clickhouse [execute]:: %d code: %s", resp.StatusCode, string(msg))
}
return resp, nil
Expand Down
71 changes: 36 additions & 35 deletions conn_http_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package clickhouse

import (
"bytes"
"context"
"errors"
"io"

chproto "github.com/ClickHouse/ch-go/proto"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
"io"
)

// release is ignored, because http used by std with empty release function
Expand All @@ -50,52 +50,48 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
if err != nil {
return nil, err
}
defer res.Body.Close()
// detect compression from http Content-Encoding header - note user will need to have set enable_http_compression
// for CH to respond with compressed data - we don't set this automatically as they might not have permissions
var body []byte
//adding Accept-Encoding:gzip on your request means response won’t be automatically decompressed per https://github.com/golang/go/blob/master/src/net/http/transport.go#L182-L190

rw := h.compressionPool.Get()
body, err = rw.read(res)
bufferSize := h.blockBufferSize
if options.blockBufferSize > 0 {
// allow block buffer sze to be overridden per query
bufferSize = options.blockBufferSize
}
var (
errCh = make(chan error)
stream = make(chan *proto.Block, bufferSize)
)

if len(body) == 0 {
// queries with no results can get an empty body
go func() {
close(stream)
close(errCh)
}()
if res.ContentLength == 0 {
block := &proto.Block{}
return &rows{
err: nil,
stream: stream,
errors: errCh,
block: &proto.Block{},
columns: []string{},
block: block,
columns: block.ColumnsNames(),
structMap: &structMap{},
}, nil
}

rw := h.compressionPool.Get()
// The HTTPReaderWriter.NewReader will create a reader that will decompress it if needed,
// cause adding Accept-Encoding:gzip on your request means response won’t be automatically decompressed
// per https://github.com/golang/go/blob/master/src/net/http/transport.go#L182-L190.
// Note user will need to have set enable_http_compression for CH to respond with compressed data. we don't set this
// automatically as they might not have permissions.
reader, err := rw.NewReader(res)
if err != nil {
res.Body.Close()
h.compressionPool.Put(rw)
return nil, err
}
h.compressionPool.Put(rw)
reader := chproto.NewReader(bytes.NewReader(body))
block, err := h.readData(ctx, reader)
if err != nil {
chReader := chproto.NewReader(reader)
block, err := h.readData(ctx, chReader)
if err != nil && !errors.Is(err, io.EOF) {
res.Body.Close()
h.compressionPool.Put(rw)
return nil, err
}

bufferSize := h.blockBufferSize
if options.blockBufferSize > 0 {
// allow block buffer sze to be overridden per query
bufferSize = options.blockBufferSize
}
var (
errCh = make(chan error)
stream = make(chan *proto.Block, bufferSize)
)
go func() {
for {
block, err := h.readData(ctx, reader)
block, err := h.readData(ctx, chReader)
if err != nil {
// ch-go wraps EOF errors
if !errors.Is(err, io.EOF) {
Expand All @@ -110,10 +106,15 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
case stream <- block:
}
}
res.Body.Close()
h.compressionPool.Put(rw)
close(stream)
close(errCh)
}()

if block == nil {
block = &proto.Block{}
}
return &rows{
block: block,
stream: stream,
Expand Down

0 comments on commit 4c8219e

Please sign in to comment.