Skip to content

Commit

Permalink
pqarrow: centralize ReadValues when writing a page (#917)
Browse files Browse the repository at this point in the history
Currently, when converting a parquet page to an arrow record, all the writers
would repeat the slow path of allocating a parquet.Values slice, read all
values, and write them to their underlying builder. However, this code already
existed one level above and is more efficient since it reuses a parquet.Values
slice.

This commit removes the repetition from the writers and leaves only the
concrete implementation of writing existing values to an arrow builder. Callers
can also check if the ValueWriter implements the PageWriter interface, which
can also offer a fast path for writing a parquet page directly.

The improvement is especially noticeable in Query/Values since the slow path
would previously fall back to write all the page values, rather than just the
dictionary values.

```
                │  benchmain   │               benchpw                │
                │    sec/op    │    sec/op     vs base                │
Query/Types-12     109.9m ± 1%    109.7m ± 2%        ~ (p=0.353 n=10)
Query/Labels-12    219.5µ ± 1%    214.1µ ± 2%   -2.46% (p=0.011 n=10)
Query/Values-12   7716.3µ ± 3%    207.7µ ± 4%  -97.31% (p=0.000 n=10)
Query/Merge-12     223.1m ± 2%    220.6m ± 1%   -1.08% (p=0.035 n=10)
Query/Range-12     117.5m ± 1%    116.1m ± 2%        ~ (p=0.218 n=10)
Query/Filter-12    9.888m ± 3%   10.025m ± 4%        ~ (p=0.684 n=10)
geomean            19.08m         10.38m       -45.58%

                │   benchmain    │               benchpw                │
                │      B/op      │     B/op      vs base                │
Query/Types-12      254.3Mi ± 1%   252.1Mi ± 3%        ~ (p=0.353 n=10)
Query/Labels-12     400.6Ki ± 0%   400.7Ki ± 0%        ~ (p=0.796 n=10)
Query/Values-12   12644.7Ki ± 0%   853.5Ki ± 0%  -93.25% (p=0.000 n=10)
Query/Merge-12      574.7Mi ± 1%   576.6Mi ± 1%        ~ (p=0.247 n=10)
Query/Range-12      212.0Mi ± 0%   212.0Mi ± 0%        ~ (p=0.190 n=10)
Query/Filter-12     13.52Mi ± 0%   13.52Mi ± 0%        ~ (p=0.739 n=10)
geomean             35.56Mi        22.67Mi       -36.25%

                │  benchmain  │               benchpw               │
                │  allocs/op  │  allocs/op   vs base                │
Query/Types-12    64.32k ± 6%   64.30k ± 4%        ~ (p=0.631 n=10)
Query/Labels-12   1.802k ± 0%   1.802k ± 0%        ~ (p=0.840 n=10)
Query/Values-12   3.677k ± 0%   2.192k ± 0%  -40.37% (p=0.000 n=10)
Query/Merge-12    1.435M ± 0%   1.435M ± 0%        ~ (p=0.424 n=10)
Query/Range-12    174.3k ± 0%   174.2k ± 0%   -0.00% (p=0.044 n=10)
Query/Filter-12   4.255k ± 0%   4.255k ± 0%        ~ (p=0.487 n=10)
geomean           27.72k        25.43k        -8.25%

                │  benchmain   │            benchpw             │
                │    B/msec    │    B/msec     vs base          │
Query/Filter-12   3.238Mi ± 3%   3.194Mi ± 4%  ~ (p=0.724 n=10)
```
  • Loading branch information
asubiotto authored Jun 27, 2024
1 parent 56013c3 commit 6206acd
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 168 deletions.
53 changes: 30 additions & 23 deletions pqarrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pqarrow

import (
"context"
"errors"
"fmt"
"io"
"sort"
Expand Down Expand Up @@ -780,36 +781,42 @@ func (c *ParquetConverter) writeColumnToArray(
}
return fmt.Errorf("read page: %w", err)
}
dict := p.Dictionary()

switch {
case !repeated && dictionaryOnly && dict != nil && p.NumNulls() == 0:
// If we are only writing the dictionary, we don't need to read
// the values.
if err := w.WritePage(dict.Page()); err != nil {
return fmt.Errorf("write dictionary page: %w", err)
dict := p.Dictionary()
if dict != nil && dictionaryOnly {
// We only want distinct values; write only the dictionary page.
if p.NumNulls() > 0 {
// Since dictionary pages do not represent nulls, write a null
// value if the non-dictionary page has at least one null.
w.Write([]parquet.Value{parquet.NullValue()})
}
case !repeated && p.NumNulls() == 0 && dict == nil:
// If the column has no nulls, we can read all values at once
// consecutively without worrying about null values.
if err := w.WritePage(p); err != nil {
p = dict.Page()
}

if pw, ok := w.(writer.PageWriter); ok {
err := pw.WritePage(p)
if err == nil {
continue
} else if err != nil && !errors.Is(err, writer.ErrCannotWritePageDirectly) {
return fmt.Errorf("write page: %w", err)
}
default:
if n := p.NumValues(); int64(cap(c.scratchValues)) < n {
c.scratchValues = make([]parquet.Value, n)
} else {
c.scratchValues = c.scratchValues[:n]
}
// Could not write page directly, fall through to slow path.
}

// We're reading all values in the page so we always expect an io.EOF.
reader := p.Values()
if _, err := reader.ReadValues(c.scratchValues); err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}
// Write values using the slow path.
n := p.NumValues()
if int64(cap(c.scratchValues)) < n {
c.scratchValues = make([]parquet.Value, n)
}
c.scratchValues = c.scratchValues[:n]

w.Write(c.scratchValues)
// We're reading all values in the page so we always expect an io.EOF.
reader := p.Values()
if _, err := reader.ReadValues(c.scratchValues); err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}

w.Write(c.scratchValues)
}

return nil
Expand Down
193 changes: 48 additions & 145 deletions pqarrow/writer/writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package writer

import (
"errors"
"fmt"
"io"

Expand All @@ -11,15 +12,32 @@ import (
)

type ValueWriter interface {
WritePage(p parquet.Page) error
// Write writes a slice of values to the underlying builder (slow path).
Write([]parquet.Value)
}

type PageWriter interface {
ValueWriter
// WritePage is the optimized path for writing a page of values to the
// underlying builder. There are cases in which the given page cannot be
// written directly, in which case ErrCannotWritePageDirectly is returned.
// The caller should fall back to writing values.
WritePage(parquet.Page) error
}

var ErrCannotWritePageDirectly = errors.New("cannot write page directly")

type writerBase struct{}

func (w writerBase) canWritePageDirectly(p parquet.Page) bool {
// Currently, for most writers, only pages with zero nulls and no dictionary
// can be written.
return p.NumNulls() == 0 && p.Dictionary() == nil
}

type binaryValueWriter struct {
b *builder.OptBinaryBuilder
scratch struct {
values []parquet.Value
}
writerBase
b *builder.OptBinaryBuilder
}

type NewWriterFunc func(b builder.ColumnBuilder, numValues int) ValueWriter
Expand All @@ -37,31 +55,16 @@ func (w *binaryValueWriter) Write(values []parquet.Value) {
}

func (w *binaryValueWriter) WritePage(p parquet.Page) error {
if p.NumNulls() != 0 {
reader := p.Values()
if cap(w.scratch.values) < int(p.NumValues()) {
w.scratch.values = make([]parquet.Value, p.NumValues())
}
w.scratch.values = w.scratch.values[:p.NumValues()]
_, err := reader.ReadValues(w.scratch.values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}
w.Write(w.scratch.values)
return nil
if !w.canWritePageDirectly(p) {
return ErrCannotWritePageDirectly
}

// No nulls in page.
values := p.Data()
return w.b.AppendData(values.ByteArray())
}

type int64ValueWriter struct {
b *builder.OptInt64Builder
scratch struct {
values []parquet.Value
}
writerBase
b *builder.OptInt64Builder
}

func NewInt64ValueWriter(b builder.ColumnBuilder, _ int) ValueWriter {
Expand All @@ -76,21 +79,9 @@ func (w *int64ValueWriter) Write(values []parquet.Value) {
}

func (w *int64ValueWriter) WritePage(p parquet.Page) error {
if p.NumNulls() != 0 {
reader := p.Values()
if cap(w.scratch.values) < int(p.NumValues()) {
w.scratch.values = make([]parquet.Value, p.NumValues())
}
w.scratch.values = w.scratch.values[:p.NumValues()]
_, err := reader.ReadValues(w.scratch.values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}
w.Write(w.scratch.values)
return nil
if !w.canWritePageDirectly(p) {
return ErrCannotWritePageDirectly
}

// No nulls in page.
values := p.Data()
w.b.AppendData(values.Int64())
Expand Down Expand Up @@ -122,22 +113,6 @@ func (w *uint64ValueWriter) Write(values []parquet.Value) {
}
}

// TODO: implement fast path of writing the whole page directly.
func (w *uint64ValueWriter) WritePage(p parquet.Page) error {
reader := p.Values()

values := make([]parquet.Value, p.NumValues())
_, err := reader.ReadValues(values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}

w.Write(values)

return nil
}

type repeatedValueWriter struct {
b *builder.ListBuilder
values ValueWriter
Expand Down Expand Up @@ -182,22 +157,6 @@ func (w *repeatedValueWriter) Write(values []parquet.Value) {
}
}

// TODO: implement fast path of writing the whole page directly.
func (w *repeatedValueWriter) WritePage(p parquet.Page) error {
reader := p.Values()

values := make([]parquet.Value, p.NumValues())
_, err := reader.ReadValues(values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}

w.Write(values)

return nil
}

type float64ValueWriter struct {
b *array.Float64Builder
buf []float64
Expand All @@ -222,46 +181,32 @@ func (w *float64ValueWriter) Write(values []parquet.Value) {
}

func (w *float64ValueWriter) WritePage(p parquet.Page) error {
reader := p.Values()
ireader, ok := p.Values().(parquet.DoubleReader)
if !ok {
return ErrCannotWritePageDirectly
}

ireader, ok := reader.(parquet.DoubleReader)
if ok {
// fast path
if w.buf == nil {
w.buf = make([]float64, p.NumValues())
if w.buf == nil {
w.buf = make([]float64, p.NumValues())
}
values := w.buf
for {
n, err := ireader.ReadDoubles(values)
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}
values := w.buf
for {
n, err := ireader.ReadDoubles(values)
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}

w.b.AppendValues(values[:n], nil)
if err == io.EOF {
break
}
w.b.AppendValues(values[:n], nil)
if err == io.EOF {
break
}
return nil
}

values := make([]parquet.Value, p.NumValues())
_, err := reader.ReadValues(values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}

w.Write(values)

return nil
}

type booleanValueWriter struct {
b *builder.OptBooleanBuilder
scratch struct {
values []parquet.Value
}
writerBase
b *builder.OptBooleanBuilder
}

func NewBooleanValueWriter(b builder.ColumnBuilder, numValues int) ValueWriter {
Expand All @@ -277,22 +222,9 @@ func (w *booleanValueWriter) Write(values []parquet.Value) {
}

func (w *booleanValueWriter) WritePage(p parquet.Page) error {
if p.NumNulls() != 0 {
reader := p.Values()
if cap(w.scratch.values) < int(p.NumValues()) {
w.scratch.values = make([]parquet.Value, p.NumValues())
}
w.scratch.values = w.scratch.values[:p.NumValues()]
_, err := reader.ReadValues(w.scratch.values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}
w.Write(w.scratch.values)
return nil
if !w.canWritePageDirectly(p) {
return ErrCannotWritePageDirectly
}

// No nulls in page.
values := p.Data()
w.b.Append(values.Boolean(), int(p.NumValues()))
return nil
Expand All @@ -313,19 +245,6 @@ func NewStructWriterFromOffset(offset int) NewWriterFunc {
}
}

func (s *structWriter) WritePage(p parquet.Page) error {
// TODO: there's probably a more optimized way to handle a page of values here; but doing this for simplicity of implementation right meow.
values := make([]parquet.Value, p.NumValues())
_, err := p.Values().ReadValues(values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}

s.Write(values)
return nil
}

func (s *structWriter) Write(values []parquet.Value) {
total := 0
for _, v := range values {
Expand Down Expand Up @@ -455,10 +374,6 @@ func NewMapWriter(b builder.ColumnBuilder, _ int) ValueWriter {
}
}

func (m *mapWriter) WritePage(_ parquet.Page) error {
panic("not implemented")
}

func (m *mapWriter) Write(_ []parquet.Value) {
panic("not implemented")
}
Expand Down Expand Up @@ -488,15 +403,3 @@ func (w *dictionaryValueWriter) Write(values []parquet.Value) {
}
}
}

func (w *dictionaryValueWriter) WritePage(p parquet.Page) error {
values := make([]parquet.Value, p.NumValues())
_, err := p.Values().ReadValues(values)
// We're reading all values in the page so we always expect an io.EOF.
if err != nil && err != io.EOF {
return fmt.Errorf("read values: %w", err)
}

w.Write(values)
return nil
}

0 comments on commit 6206acd

Please sign in to comment.