grafana/vendor/github.com/linkedin/goavro/array.go

227 lines
8.2 KiB
Go

// Copyright [2017] LinkedIn Corp. Licensed under the Apache License, Version
// 2.0 (the "License"); you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package goavro
import (
"fmt"
"io"
"math"
"reflect"
)
func makeArrayCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
// array type must have items
itemSchema, ok := schemaMap["items"]
if !ok {
return nil, fmt.Errorf("Array ought to have items key")
}
itemCodec, err := buildCodec(st, enclosingNamespace, itemSchema)
if err != nil {
return nil, fmt.Errorf("Array items ought to be valid Avro type: %s", err)
}
return &Codec{
typeName: &name{"array", nullNamespace},
nativeFromBinary: func(buf []byte) (interface{}, []byte, error) {
var value interface{}
var err error
// block count and block size
if value, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block count: %s", err)
}
blockCount := value.(int64)
if blockCount < 0 {
// NOTE: A negative block count implies there is a long encoded
// block size following the negative block count. We have no use
// for the block size in this decoder, so we read and discard
// the value.
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can never be made positive
return nil, nil, fmt.Errorf("cannot decode binary array with block count: %d", math.MinInt64)
}
blockCount = -blockCount // convert to its positive equivalent
if _, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, nil, fmt.Errorf("cannot decode binary array when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
// NOTE: While the attempt of a RAM optimization shown below is not
// necessary, many encoders will encode all items in a single block.
// We can optimize amount of RAM allocated by runtime for the array
// by initializing the array for that number of items.
arrayValues := make([]interface{}, 0, blockCount)
for blockCount != 0 {
// Decode `blockCount` datum values from buffer
for i := int64(0); i < blockCount; i++ {
if value, buf, err = itemCodec.nativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array item %d: %s", i+1, err)
}
arrayValues = append(arrayValues, value)
}
// Decode next blockCount from buffer, because there may be more blocks
if value, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block count: %s", err)
}
blockCount = value.(int64)
if blockCount < 0 {
// NOTE: A negative block count implies there is a long
// encoded block size following the negative block count. We
// have no use for the block size in this decoder, so we
// read and discard the value.
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can
// never be made positive
return nil, nil, fmt.Errorf("cannot decode binary array with block count: %d", math.MinInt64)
}
blockCount = -blockCount // convert to its positive equivalent
if _, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, nil, fmt.Errorf("cannot decode binary array when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
}
return arrayValues, buf, nil
},
binaryFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
arrayValues, err := convertArray(datum)
if err != nil {
return nil, fmt.Errorf("cannot encode binary array: %s", err)
}
arrayLength := int64(len(arrayValues))
var alreadyEncoded, remainingInBlock int64
for i, item := range arrayValues {
if remainingInBlock == 0 { // start a new block
remainingInBlock = arrayLength - alreadyEncoded
if remainingInBlock > MaxBlockCount {
// limit block count to MacBlockCount
remainingInBlock = MaxBlockCount
}
buf, _ = longBinaryFromNative(buf, remainingInBlock)
}
if buf, err = itemCodec.binaryFromNative(buf, item); err != nil {
return nil, fmt.Errorf("cannot encode binary array item %d: %v: %s", i+1, item, err)
}
remainingInBlock--
alreadyEncoded++
}
return longBinaryFromNative(buf, 0) // append trailing 0 block count to signal end of Array
},
nativeFromTextual: func(buf []byte) (interface{}, []byte, error) {
var arrayValues []interface{}
var value interface{}
var err error
var b byte
if buf, err = advanceAndConsume(buf, '['); err != nil {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", err)
}
if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
}
// NOTE: Special case for empty array
if buf[0] == ']' {
return arrayValues, buf[1:], nil
}
// NOTE: Also terminates when read ']' byte.
for len(buf) > 0 {
// decode value
value, buf, err = itemCodec.nativeFromTextual(buf)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", err)
}
arrayValues = append(arrayValues, value)
// either comma or closing curly brace
if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
}
switch b = buf[0]; b {
case ']':
return arrayValues, buf[1:], nil
case ',':
// no-op
default:
return nil, nil, fmt.Errorf("cannot decode textual array: expected ',' or ']'; received: %q", b)
}
// NOTE: consume comma from above
if buf, _ = advanceToNonWhitespace(buf[1:]); len(buf) == 0 {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
}
}
return nil, buf, io.ErrShortBuffer
},
textualFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
arrayValues, err := convertArray(datum)
if err != nil {
return nil, fmt.Errorf("cannot encode textual array: %s", err)
}
var atLeastOne bool
buf = append(buf, '[')
for i, item := range arrayValues {
atLeastOne = true
// Encode value
buf, err = itemCodec.textualFromNative(buf, item)
if err != nil {
// field was specified in datum; therefore its value was invalid
return nil, fmt.Errorf("cannot encode textual array item %d; %v: %s", i+1, item, err)
}
buf = append(buf, ',')
}
if atLeastOne {
return append(buf[:len(buf)-1], ']'), nil
}
return append(buf, ']'), nil
},
}, nil
}
// convertArray converts interface{} to []interface{} if possible.
func convertArray(datum interface{}) ([]interface{}, error) {
arrayValues, ok := datum.([]interface{})
if ok {
return arrayValues, nil
}
// NOTE: When given a slice of any other type, zip values to
// items as a convenience to client.
v := reflect.ValueOf(datum)
if v.Kind() != reflect.Slice {
return nil, fmt.Errorf("cannot create []interface{}: expected slice; received: %T", datum)
}
// NOTE: Two better alternatives to the current algorithm are:
// (1) mutate the reflection tuple underneath to convert the
// []int, for example, to []interface{}, with O(1) complexity
// (2) use copy builtin to zip the data items over with O(n) complexity,
// but more efficient than what's below.
// Suggestions?
arrayValues = make([]interface{}, v.Len())
for idx := 0; idx < v.Len(); idx++ {
arrayValues[idx] = v.Index(idx).Interface()
}
return arrayValues, nil
}