Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dirty example on how to insert and select driver.Valuer and custom slices (try to fix #59 and #61 and #44) #60

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
95 changes: 92 additions & 3 deletions ch/chschema/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chschema

import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -111,17 +112,77 @@ func (c ColumnOf[T]) Slice(s, e int) any {
return c.Column[s:e]
}

func (c *ColumnOf[T]) appendSliceColumn(v reflect.Value) {
colElemType := reflect.TypeOf(c.Column).Elem().Elem()
col := reflect.MakeSlice(reflect.SliceOf(colElemType), 0, v.Len())
for i := 0; i < v.Len(); i++ {
col = reflect.Append(col, v.Index(i).Convert(colElemType))
}
c.Column = append(c.Column, col.Interface().(T))
}

func (c *ColumnOf[T]) AppendValue(v reflect.Value) {
if reflect.TypeOf(c.Column).Elem().Kind() == reflect.Slice && v.Kind() == reflect.Slice {
c.appendSliceColumn(v)
return
}
c.Column = append(c.Column, v.Interface().(T))
}

func (c *ColumnOf[T]) convertSliceColumn(slice T, destSlice reflect.Value) {
sliceValue := reflect.ValueOf(slice)
elemType := destSlice.Type().Elem()

ret := reflect.MakeSlice(reflect.SliceOf(elemType), 0, sliceValue.Len())
for i := 0; i < sliceValue.Len(); i++ {
ret = reflect.Append(ret, sliceValue.Index(i).Convert(elemType))
}

destSlice.Set(ret)
}

func (c *ColumnOf[T]) ConvertAssign(idx int, dest reflect.Value) error {
if reflect.TypeOf(c.Column).Elem().Kind() == reflect.Slice && dest.Kind() == reflect.Slice {
c.convertSliceColumn(c.Column[idx], dest)
return nil
}
dest.Set(reflect.ValueOf(c.Column[idx]))
return nil
}

//------------------------------------------------------------------------------

func convertAssignDriverValue(col any, v reflect.Value) bool {
var ret reflect.Value

if _, ok := v.Interface().(sql.Scanner); !ok {
if !v.CanAddr() {
return false
}
if _, ok := v.Addr().Interface().(sql.Scanner); !ok {
return false
}

ret = reflect.New(v.Type())
if err := ret.Interface().(sql.Scanner).Scan(col); err != nil {
return false
}

v.Set(ret.Elem())
return true
}

ret = reflect.New(v.Type().Elem())
if err := ret.Interface().(sql.Scanner).Scan(col); err != nil {
return false
}

v.Set(ret)
return true
}

//------------------------------------------------------------------------------

type NumericColumnOf[T constraints.Integer | constraints.Float] struct {
ColumnOf[T]
}
Expand All @@ -135,7 +196,11 @@ func (c NumericColumnOf[T]) ConvertAssign(idx int, v reflect.Value) error {
case reflect.Float32, reflect.Float64:
v.SetFloat(float64(c.Column[idx]))
default:
v.Set(reflect.ValueOf(c.Column[idx]))
if !convertAssignDriverValue(c.Column[idx], v) {
v.Set(reflect.ValueOf(c.Column[idx]))
return nil
}
return nil
}
return nil
}
Expand All @@ -156,7 +221,7 @@ func (c StringColumn) ConvertAssign(idx int, v reflect.Value) error {
v.SetString(c.Column[idx])
return nil
case reflect.Slice:
if v.Type() == bytesType {
if v.Type() == bytesType || v.Type() == reflect.TypeOf((*json.RawMessage)(nil)).Elem() {
v.SetBytes(internal.Bytes(c.Column[idx]))
return nil
}
Expand All @@ -165,7 +230,31 @@ func (c StringColumn) ConvertAssign(idx int, v reflect.Value) error {
dec.UseNumber()
return dec.Decode(v.Addr().Interface())
default:
v.Set(reflect.ValueOf(c.Column[idx]))
if !convertAssignDriverValue(c.Column[idx], v) {
// v.Set(reflect.ValueOf(c.Column[idx]))
return nil
}
return nil
}
return fmt.Errorf("ch: can't scan %s into %s", "string", v.Type())
}

//------------------------------------------------------------------------------

func (c *UInt256Column) ConvertAssign(idx int, v reflect.Value) error {
switch v.Kind() {
case reflect.String:
v.SetString(c.Column[idx].String())
return nil
case reflect.Slice:
if v.Type() == bytesType {
v.SetBytes(internal.Bytes(string(c.Column[idx].Bytes())))
return nil
}
default:
if !convertAssignDriverValue(c.Column[idx].String(), v) {
return fmt.Errorf("ch: convertAssign UInt256 %x", c.Column[idx])
}
return nil
}
return fmt.Errorf("ch: can't scan %s into %s", "string", v.Type())
Expand Down
173 changes: 171 additions & 2 deletions ch/chschema/column_gen.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,42 @@
package chschema

import (
"database/sql/driver"
"encoding/json"
"fmt"
"math/big"
"reflect"
"strconv"
"time"

"github.com/uptrace/go-clickhouse/ch/chproto"
)

func getDriverValue(v reflect.Value) driver.Value {
if v.Kind() == reflect.Pointer && v.IsNil() {
return nil
}

dv, ok := v.Interface().(driver.Valuer)
if !ok {
if !v.CanAddr() {
return nil
}
if dv, ok = v.Addr().Interface().(driver.Valuer); !ok {
return nil
}
}

value, err := dv.Value()
if err != nil {
return nil
}

return value
}

//------------------------------------------------------------------------------

type Int8Column struct {
NumericColumnOf[int8]
}
Expand Down Expand Up @@ -1396,7 +1426,116 @@ func (c *UInt64Column) Type() reflect.Type {
}

func (c *UInt64Column) AppendValue(v reflect.Value) {
c.Column = append(c.Column, uint64(v.Uint()))
switch v.Kind() {
case reflect.Uint, reflect.Uintptr, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
c.Column = append(c.Column, v.Uint())
case reflect.String:
i, err := strconv.ParseInt(v.String(), 10, 64)
if err != nil {
return
}
c.Column = append(c.Column, uint64(i))
case reflect.Pointer:
if v.IsNil() {
c.Column = append(c.Column, 0)
return
}
c.AppendValue(v.Elem())
default:
value := getDriverValue(v)
if value != nil {
c.AppendValue(reflect.ValueOf(value))
return
}
c.Column = append(c.Column, v.Uint())
}
}

//------------------------------------------------------------------------------

type UInt256Column struct {
ColumnOf[*big.Int]
}

var _ Columnar = (*UInt256Column)(nil)

func NewUInt256Column() Columnar {
return new(UInt256Column)
}

var _UInt256Type = reflect.TypeOf((*big.Int)(nil)).Elem()

const u256sz = 256 / 8

func (c *UInt256Column) Type() reflect.Type {
return _UInt256Type
}

func (c *UInt256Column) AppendValue(v reflect.Value) {
switch v.Kind() {
case reflect.Uint, reflect.Uintptr, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
c.Column = append(c.Column, new(big.Int).SetUint64(v.Uint()))
case reflect.String:
i, ok := new(big.Int).SetString(v.String(), 10)
if !ok {
return
}
c.Column = append(c.Column, i)
case reflect.Slice:
ba, ok := v.Interface().([]byte)
if !ok {
return
}
c.Column = append(c.Column, new(big.Int).SetBytes(ba))
case reflect.Pointer:
if v.IsNil() {
c.Column = append(c.Column, new(big.Int))
return
}
c.AppendValue(v.Elem())
default:
value := getDriverValue(v)
if value != nil {
c.AppendValue(reflect.ValueOf(value))
return
}
}
}

func (c *UInt256Column) ReadFrom(rd *chproto.Reader, numRow int) error {
c.AllocForReading(numRow)

for it := range c.Column {
b := make([]byte, u256sz, u256sz)
n, err := rd.Read(b)
if err != nil {
return err
}
if n != u256sz {
return fmt.Errorf("expected read bytes %d, got %d", u256sz, n)
}
for i := 0; i < len(b)/2; i++ {
b[i], b[len(b)-i-1] = b[len(b)-i-1], b[i]
}
c.Column[it] = new(big.Int).SetBytes(b)
}

return nil
}

func (c *UInt256Column) WriteTo(wr *chproto.Writer) error {
for _, n := range c.Column {
if n.BitLen() > 256 {
return fmt.Errorf("%s has bigger len %d than 256", n, n.BitLen())
}
b := make([]byte, u256sz, u256sz)
b = n.FillBytes(b)
for i := 0; i < len(b)/2; i++ {
b[i], b[len(b)-i-1] = b[len(b)-i-1], b[i]
}
wr.Write(b)
}
return nil
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -2201,7 +2340,37 @@ func (c *StringColumn) Type() reflect.Type {
}

func (c *StringColumn) AppendValue(v reflect.Value) {
c.Column = append(c.Column, string(v.String()))
if v.Kind() == reflect.Pointer && v.IsNil() {
c.Column = append(c.Column, "")
return
}

switch vi := v.Interface().(type) {
case string:
c.Column = append(c.Column, vi)
case []byte:
c.Column = append(c.Column, string(vi))
case json.RawMessage:
c.Column = append(c.Column, string(vi))
default:
value := getDriverValue(v)
if value != nil {
c.AppendValue(reflect.ValueOf(value))
return
}
if v.Kind() == reflect.String {
c.Column = append(c.Column, v.String())
return
}
j, err := json.Marshal(vi)
if err != nil {
panic(err)
}
if string(j) == "null" {
j = nil
}
c.Column = append(c.Column, string(j))
}
}

func (c *StringColumn) ReadFrom(rd *chproto.Reader, numRow int) error {
Expand Down
2 changes: 2 additions & 0 deletions ch/chschema/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func ColumnFactory(chType string, typ reflect.Type) NewColumnFunc {
return NewUInt32Column
case chtype.UInt64:
return NewUInt64Column
case chtype.UInt256:
return NewUInt256Column
case chtype.Float32:
return NewFloat32Column
case chtype.Float64:
Expand Down
1 change: 1 addition & 0 deletions ch/chtype/chtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
UInt16 = "UInt16"
UInt32 = "UInt32"
UInt64 = "UInt64"
UInt256 = "UInt256"
Float32 = "Float32"
Float64 = "Float64"
DateTime = "DateTime"
Expand Down
Loading