Skip to content

Commit

Permalink
pqarrow/builder: Add OptFloat64Builder (#823)
Browse files Browse the repository at this point in the history
* pqarrow/builder: Add OptFloat64Builder

We mostly need this because it supports the Set and Add methods compared to the upstream Float64Builder.

* pqarrow/builder: Add test for OptBuilders
  • Loading branch information
metalmatze authored Apr 18, 2024
1 parent 2ca8069 commit 1fad9e6
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 2 deletions.
124 changes: 124 additions & 0 deletions pqarrow/builder/optbuilders.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ var (
_ OptimizedBuilder = (*OptBinaryBuilder)(nil)
_ OptimizedBuilder = (*OptInt64Builder)(nil)
_ OptimizedBuilder = (*OptBooleanBuilder)(nil)
_ OptimizedBuilder = (*OptFloat64Builder)(nil)
)

// OptBinaryBuilder is an optimized array.BinaryBuilder.
Expand Down Expand Up @@ -668,3 +669,126 @@ func (b *OptInt32Builder) Reserve(n int) {
b.data = slices.Grow(b.data, n)[:n]
b.validityBitmap = resizeBitmap(b.validityBitmap, n)
}

type OptFloat64Builder struct {
builderBase

data []float64
}

func NewOptFloat64Builder(dtype arrow.DataType) *OptFloat64Builder {
b := &OptFloat64Builder{}
b.dtype = dtype
return b
}

func (b *OptFloat64Builder) resizeData(neededLength int) {
if cap(b.data) < neededLength {
oldData := b.data
b.data = make([]float64, bitutil.NextPowerOf2(neededLength))
copy(b.data, oldData)
}
b.data = b.data[:neededLength]
}

func (b *OptFloat64Builder) Release() {
if atomic.AddInt64(&b.refCount, -1) == 0 {
b.data = nil
b.releaseInternal()
}
}

func (b *OptFloat64Builder) AppendNull() {
b.AppendNulls(1)
}

func (b *OptFloat64Builder) AppendNulls(n int) {
b.resizeData(b.length + n)
b.builderBase.AppendNulls(n)
}

func (b *OptFloat64Builder) NewArray() arrow.Array {
dataAsBytes := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(b.data))), len(b.data)*arrow.Float64SizeBytes)
data := array.NewData(
b.dtype,
b.length,
[]*memory.Buffer{
memory.NewBufferBytes(b.validityBitmap),
memory.NewBufferBytes(dataAsBytes),
},
nil,
b.length-bitutil.CountSetBits(b.validityBitmap, 0, b.length),
0,
)
b.reset()
b.data = nil
return array.NewFloat64Data(data)
}

// AppendData appends a slice of float64s to the builder.
// This data is considered to be non-null.
func (b *OptFloat64Builder) AppendData(data []float64) {
oldLength := b.length
b.data = append(b.data, data...)
b.length += len(data)
b.validityBitmap = resizeBitmap(b.validityBitmap, b.length)
bitutil.SetBitsTo(b.validityBitmap, int64(oldLength), int64(len(data)), true)
}

func (b *OptFloat64Builder) Append(v float64) {
b.data = append(b.data, v)
b.length++
b.validityBitmap = resizeBitmap(b.validityBitmap, b.length)
bitutil.SetBit(b.validityBitmap, b.length-1)
}

func (b *OptFloat64Builder) Set(i int, v float64) {
b.data[i] = v
}

func (b *OptFloat64Builder) Add(i int, v float64) {
b.data[i] += v
}

// Value returns the ith value of the builder.
func (b *OptFloat64Builder) Value(i int) float64 {
return b.data[i]
}

func (b *OptFloat64Builder) AppendParquetValues(values []parquet.Value) {
b.resizeData(b.length + len(values))
b.validityBitmap = resizeBitmap(b.validityBitmap, b.length+len(values))
for i, j := b.length, 0; i < b.length+len(values) && j < len(values); {
b.data[i] = values[j].Double()
bitutil.SetBitTo(b.validityBitmap, i, !values[j].IsNull())
i++
j++
}
b.length += len(values)
}

func (b *OptFloat64Builder) RepeatLastValue(n int) error {
if bitutil.BitIsNotSet(b.validityBitmap, b.length-1) {
b.AppendNulls(n)
return nil
}

lastValue := b.data[b.length-1]
b.resizeData(b.length + n)
for i := b.length; i < b.length+n; i++ {
b.data[i] = lastValue
}
b.appendValid(n)
return nil
}

// ResetToLength is specific to distinct optimizations in FrostDB.
func (b *OptFloat64Builder) ResetToLength(n int) {
if n == b.length {
return
}

b.length = n
b.data = b.data[:n]
b.validityBitmap = resizeBitmap(b.validityBitmap, n)
}
46 changes: 46 additions & 0 deletions pqarrow/builder/optbuilders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,52 @@ import (
"github.com/polarsignals/frostdb/pqarrow/builder"
)

func TestOptBuilders(t *testing.T) {
testCases := []struct {
b builder.OptimizedBuilder
v any
}{
{
b: builder.NewOptBinaryBuilder(arrow.BinaryTypes.Binary),
v: []byte("hello"),
},
{
b: builder.NewOptBooleanBuilder(arrow.FixedWidthTypes.Boolean),
v: true,
},
{
b: builder.NewOptFloat64Builder(arrow.PrimitiveTypes.Float64),
v: 1.0,
},
{
b: builder.NewOptInt32Builder(arrow.PrimitiveTypes.Int32),
v: int32(123),
},
{
b: builder.NewOptInt64Builder(arrow.PrimitiveTypes.Int64),
v: int64(123),
},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("%T", tc.b), func(t *testing.T) {
require.NoError(t, builder.AppendGoValue(tc.b, tc.v))
require.NoError(t, builder.AppendGoValue(tc.b, tc.v))

require.Equal(t, tc.b.Len(), 2)
require.True(t, tc.b.IsValid(0))
require.True(t, tc.b.IsValid(1))

tc.b.SetNull(1) // overwrite second value with NULL
require.True(t, tc.b.IsValid(0))
require.True(t, tc.b.IsNull(1))

a := tc.b.NewArray()
require.Equal(t, tc.v, a.GetOneForMarshal(0))
require.Equal(t, nil, a.GetOneForMarshal(1))
})
}
}

// https://github.com/polarsignals/frostdb/issues/270
func TestIssue270(t *testing.T) {
b := builder.NewOptBinaryBuilder(arrow.BinaryTypes.Binary)
Expand Down
8 changes: 6 additions & 2 deletions pqarrow/builder/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,14 @@ func AppendGoValue(cb ColumnBuilder, v any) error {
switch b := cb.(type) {
case *OptBinaryBuilder:
return b.Append(v.([]byte))
case *OptInt64Builder:
b.Append(v.(int64))
case *OptBooleanBuilder:
b.AppendSingle(v.(bool))
case *OptFloat64Builder:
b.Append(v.(float64))
case *OptInt32Builder:
b.Append(v.(int32))
case *OptInt64Builder:
b.Append(v.(int64))
case *array.Int64Builder:
b.Append(v.(int64))
case *array.Int32Builder:
Expand Down

0 comments on commit 1fad9e6

Please sign in to comment.