Skip to content

Commit

Permalink
improve offsets code according to code review
Browse files Browse the repository at this point in the history
  • Loading branch information
rluvaton committed Dec 21, 2024
1 parent 05cf968 commit ec7e135
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 263 deletions.
223 changes: 47 additions & 176 deletions arrow-buffer/src/buffer/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
// under the License.

use crate::buffer::ScalarBuffer;
use crate::{ArrowNativeType, Buffer, MutableBuffer, OffsetBufferBuilder};
use num::Integer;
use crate::{ArrowNativeType, MutableBuffer, OffsetBufferBuilder};
use std::ops::Deref;

/// A non-empty buffer of monotonically increasing, positive integers.
Expand Down Expand Up @@ -134,6 +133,38 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
Self(out.into())
}

/// Get an Iterator over the lengths of this [`OffsetBuffer`]
///
/// ```
/// # use arrow_buffer::{OffsetBuffer, ScalarBuffer};
/// let offsets = OffsetBuffer::<_>::new(ScalarBuffer::<i32>::from(vec![0, 1, 4, 9]));
/// assert_eq!(offsets.lengths().collect::<Vec<usize>>(), vec![1, 3, 5]);
/// ```
///
/// Empty [`OffsetBuffer`] will return an empty iterator
/// ```
/// # use arrow_buffer::OffsetBuffer;
/// let offsets = OffsetBuffer::<i32>::new_empty();
/// assert_eq!(offsets.lengths().count(), 0);
/// ```
///
/// This can be used to merge multiple [`OffsetBuffer`]s to one
/// ```
/// # use arrow_buffer::{OffsetBuffer, ScalarBuffer};
///
/// let buffer1 = OffsetBuffer::<i32>::from_lengths([2, 6, 3, 7, 2]);
/// let buffer2 = OffsetBuffer::<i32>::from_lengths([1, 3, 5, 7, 9]);
///
/// let merged = OffsetBuffer::<i32>::from_lengths(
/// vec![buffer1, buffer2].iter().flat_map(|x| x.lengths())
/// );
///
/// assert_eq!(merged.lengths().collect::<Vec<_>>(), &[2, 6, 3, 7, 2, 1, 3, 5, 7, 9]);
/// ```
pub fn lengths(&self) -> impl ExactSizeIterator<Item = usize> + '_ {
self.0.windows(2).map(|x| x[1].as_usize() - x[0].as_usize())
}

/// Free up unused memory.
pub fn shrink_to_fit(&mut self) {
self.0.shrink_to_fit();
Expand Down Expand Up @@ -163,44 +194,6 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {
}
}

impl<O: ArrowNativeType + Integer + Copy> OffsetBuffer<O> {
/// Merge multiple [`OffsetBuffer`] into a single [`OffsetBuffer`]
///
///
/// ```
/// # use arrow_buffer::{OffsetBuffer, ScalarBuffer};
/// // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]]
/// // The output should be
/// // [ 0, 3, 5, 7, 7, 13, 13, 14]
/// let buffers = [
/// OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 3, 5])),
/// OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 2, 2, 8])),
/// OffsetBuffer::<i32>::new_empty(),
/// OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 0, 1])),
/// ];
///
/// let buffer = OffsetBuffer::<i32>::merge(&buffers);
/// assert_eq!(buffer.as_ref(), &[ 0, 3, 5, 7, 7, 13, 13, 14]);
/// ```
///
pub fn merge<'a, Iter>(offset_buffers_iterator: Iter) -> Self
where
Iter: IntoIterator<Item = &'a OffsetBuffer<O>>,
<Iter as IntoIterator>::IntoIter: 'a + Clone,
{
let iter = MergeBuffersIter::from(offset_buffers_iterator.into_iter());
if iter.len() == 0 {
return Self::new_empty();
}

let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };

let scalar_buffer: ScalarBuffer<O> = buffer.into();

Self::new(scalar_buffer)
}
}

impl<T: ArrowNativeType> Deref for OffsetBuffer<T> {
type Target = [T];

Expand All @@ -223,119 +216,6 @@ impl<O: ArrowNativeType> From<OffsetBufferBuilder<O>> for OffsetBuffer<O> {
}
}

struct MergeBuffersIter<'a, Offset: Integer + Copy> {
size: usize,
iterator: Box<dyn Iterator<Item = &'a [Offset]> + 'a>,
inner_iterator: Box<dyn Iterator<Item = Offset> + 'a>,
advance_by: Offset,
next_advance_by: Offset,
}

impl<'a, Offset, Iter> From<Iter> for MergeBuffersIter<'a, Offset>
where
Offset: ArrowNativeType + Integer + Copy,
Iter: Iterator<Item = &'a OffsetBuffer<Offset>> + Clone + 'a,
{
fn from(offset_buffers: Iter) -> Self {
Self::new(offset_buffers.clone(), Self::calculate_size(offset_buffers))
}
}

impl<'a, Offset: ArrowNativeType + Integer + Copy> MergeBuffersIter<'a, Offset> {
fn new(
offset_buffers_iterator: impl Iterator<Item = &'a OffsetBuffer<Offset>> + 'a,
size: usize,
) -> Self {
let offsets_iterator: Box<dyn Iterator<Item = &'a [Offset]>> = Box::new(
offset_buffers_iterator
// Filter out empty lists or lists with only 1 offset which are invalid as they should have at least 2 offsets (start and end)
.filter(|offset_buffer| offset_buffer.len() > 1)
.map(|offset_buffer| offset_buffer.inner().as_ref()),
);

Self {
size,
iterator: Box::new(offsets_iterator),
inner_iterator: if size == 0 {
Box::new([].into_iter())
} else {
// Start initially with outputting the initial offset
Box::new([Offset::zero()].into_iter())
},
advance_by: Offset::zero(),
next_advance_by: Offset::zero(),
}
}

fn calculate_size(buffers: impl Iterator<Item = &'a OffsetBuffer<Offset>>) -> usize {
// The total length of the merged offset buffer
// We calculate this so we can use the faster `try_from_trusted_len_iter` method which requires fixed length
let merged_offset_length: usize = buffers
// 1. `saturating_sub` as the list can be empty
// 2. subtract 1 as we have the initial offset of 0 that we don't need to count for each list, and we are adding 1 at the end
.map(|x| x.len().saturating_sub(1))
.sum();

if merged_offset_length == 0 {
return 0;
}

// we need to add 1 to the total length of the merged offset buffer as we have the initial offset of 0
merged_offset_length + 1
}
}

impl<Offset: ArrowNativeType + Integer + Copy> Iterator for MergeBuffersIter<'_, Offset> {
type Item = Offset;

fn next(&mut self) -> Option<Self::Item> {
// 1. Consume the inner iterator first
let inner_value = self.inner_iterator.next();

// 2. If we have a value, advance it by the last value in the previous buffer (advance_by)
if inner_value.is_some() {
self.size -= 1;
return Some(inner_value.unwrap() + self.advance_by);
}

self.advance_by = self.next_advance_by;

// 3. If no more iterators, than we finished
let current_offset_buffer = self.iterator.next()?;

// 4. Get the last value of the current buffer so we can know how much to advance the next buffer
// Safety: We already filtered out empty lists
let last_value = *current_offset_buffer.last().unwrap();

// 5. Update the next advance_by
self.next_advance_by = self.advance_by + last_value;

self.inner_iterator = Box::new(
current_offset_buffer
.iter()
// 6. Skip the initial offset of 0
// Skipping the first item as it is the initial offset of 0,
// and we skip even for the first iterator as we handle that by starting with inner_iterator of [0]
.skip(1)
.copied(),
);

// 7. Resume the inner iterator
// We already filtered out lists that have less than 2 offsets so can guarantee that the next call will return a value
self.next()
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.size, Some(self.size))
}
}

impl<Offset: ArrowNativeType + Integer + Copy> ExactSizeIterator for MergeBuffersIter<'_, Offset> {
fn len(&self) -> usize {
self.size
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -398,31 +278,22 @@ mod tests {
}

#[test]
fn merge_from() {
// [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]]
// The output should be
// [ 0, 3, 5, 7, 7, 13, 13, 14]
//
let buffers = [
OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 3, 5])),
OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 2, 2, 8])),
OffsetBuffer::<i32>::new_empty(),
OffsetBuffer::<i32>::new(ScalarBuffer::from(vec![0, 0, 1])),
];

let buffer = OffsetBuffer::<i32>::merge(&buffers);
assert_eq!(buffer.as_ref(), &[0, 3, 5, 7, 7, 13, 13, 14]);
fn get_lengths() {
let offsets = OffsetBuffer::<i32>::new(ScalarBuffer::<i32>::from(vec![0, 1, 4, 9]));
assert_eq!(offsets.lengths().collect::<Vec<usize>>(), vec![1, 3, 5]);
}

#[test]
fn get_lengths_should_be_with_fixed_size() {
let offsets = OffsetBuffer::<i32>::new(ScalarBuffer::<i32>::from(vec![0, 1, 4, 9]));
let iter = offsets.lengths();
assert_eq!(iter.size_hint(), (3, Some(3)));
assert_eq!(iter.len(), 3);
}

#[test]
fn merge_from_empty() {
let buffers = [
OffsetBuffer::<i32>::new_empty(),
OffsetBuffer::<i32>::new_empty(),
OffsetBuffer::<i32>::new_empty(),
];

let buffer = OffsetBuffer::<i32>::merge(&buffers);
assert_eq!(buffer.as_ref(), OffsetBuffer::<i32>::new_empty().as_ref());
fn get_lengths_from_empty_offset_buffer_should_be_empty_iterator() {
let offsets = OffsetBuffer::<i32>::new_empty();
assert_eq!(offsets.lengths().collect::<Vec<usize>>(), vec![]);
}
}
91 changes: 4 additions & 87 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ fn concat_list_of_dictionaries<OffsetSize: OffsetSizeTrait, K: ArrowDictionaryKe
let array = unsafe { DictionaryArray::new_unchecked(keys, merged.values) };

// Merge value offsets from the lists
let value_offset_buffer = OffsetBuffer::merge(lists.iter().map(|x| x.offsets()))
.into_inner()
.into_inner();
let value_offset_buffer =
OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()))
.into_inner()
.into_inner();

let builder = ArrayDataBuilder::new(arrays[0].data_type().clone())
.len(output_len)
Expand Down Expand Up @@ -1031,90 +1032,6 @@ mod tests {
);
}

#[test]
fn concat_dictionary_list_array_with_multiple_rows() {
let scalars = vec![
create_list_of_dict(vec![
// Row 1
Some(vec![Some("a"), Some("c")]),
// Row 2
None,
// Row 3
Some(vec![Some("f"), Some("g"), None]),
// Row 4
Some(vec![Some("c"), Some("f")]),
]),
create_list_of_dict(vec![
// Row 1
Some(vec![Some("a")]),
// Row 2
Some(vec![]),
// Row 3
Some(vec![None, Some("b")]),
// Row 4
Some(vec![Some("d"), Some("e")]),
]),
create_list_of_dict(vec![
// Row 1
Some(vec![Some("g")]),
// Row 2
Some(vec![Some("h"), Some("i")]),
// Row 3
Some(vec![Some("j"), Some("a")]),
// Row 4
Some(vec![Some("d"), Some("e")]),
]),
];
let arrays = scalars
.iter()
.map(|a| a as &(dyn Array))
.collect::<Vec<_>>();
let concat_res = concat(arrays.as_slice()).unwrap();

let expected_list = create_list_of_dict(vec![
// First list:

// Row 1
Some(vec![Some("a"), Some("c")]),
// Row 2
None,
// Row 3
Some(vec![Some("f"), Some("g"), None]),
// Row 4
Some(vec![Some("c"), Some("f")]),
// Second list:
// Row 1
Some(vec![Some("a")]),
// Row 2
Some(vec![]),
// Row 3
Some(vec![None, Some("b")]),
// Row 4
Some(vec![Some("d"), Some("e")]),
// Third list:

// Row 1
Some(vec![Some("g")]),
// Row 2
Some(vec![Some("h"), Some("i")]),
// Row 3
Some(vec![Some("j"), Some("a")]),
// Row 4
Some(vec![Some("d"), Some("e")]),
]);

let list = concat_res.as_list::<i32>();

// Assert that the list is equal to the expected list
list.iter().zip(expected_list.iter()).for_each(|(a, b)| {
assert_eq!(a, b);
});

assert_dictionary_has_unique_values::<_, StringArray>(
list.values().as_dictionary::<Int32Type>(),
);
}

fn create_single_row_list_of_dict(
list_items: Vec<Option<impl AsRef<str>>>,
) -> GenericListArray<i32> {
Expand Down

0 comments on commit ec7e135

Please sign in to comment.