Skip to content

Commit

Permalink
Fix panic in GroupInfo::members
Browse files Browse the repository at this point in the history
I recently observed undefined behavior in one of the projects I'm
working on because [slice::from_raw_parts] is called without a
null-check in `GroupInfo::members`. This undefined behavior was present
when iterating over the resulting slice and it would just terminate
prematurely when trying to chain multiple iterators. The function is
pretty strict about what kind of pointers it accepts:

> data must be non-null and aligned even for zero-length slices.

This undefined behavior has become a panic in debug builds in [Rust 1.78.0]:

> For example, slice::from_raw_parts requires an aligned non-null pointer.
> The following use of a purposely-misaligned pointer has undefined behavior,
> and while if you were unlucky it may have appeared to "work" in the past,
> the debug assertion can now catch it:

Cause is found in [rdkafka.c]. I see there are more uses of
`slice::from_raw_parts` so I replaced all of them except a call to
`Vec::from_raw_parts` which seems fine. I'd appreciate feedback!

[slice::from_raw_parts]: https://doc.rust-lang.org/std/slice/fn.from_raw_parts.html
[Rust 1.78.0]: https://blog.rust-lang.org/2024/05/02/Rust-1.78.0.html#asserting-unsafe-preconditions
[rdkafka.c]: https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src/rdkafka.c#L4668-L4670
  • Loading branch information
jeremija committed May 6, 2024
1 parent e69c2aa commit 4470b95
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::ffi::CString;
use std::iter::FromIterator;
use std::os::raw::c_char;
use std::ptr;
Expand Down
34 changes: 12 additions & 22 deletions src/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
use std::ffi::CStr;
use std::fmt;
use std::slice;

use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::util::{KafkaDrop, NativePtr};
use crate::util::ptr_to_opt_slice;
use crate::util::{ptr_to_slice, KafkaDrop, NativePtr};

/// Group member information container.
pub struct GroupMemberInfo(RDKafkaGroupMemberInfo);
Expand Down Expand Up @@ -43,28 +43,20 @@ impl GroupMemberInfo {
/// Return the metadata of the member.
pub fn metadata(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_metadata.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>(
self.0.member_metadata as *const u8,
self.0.member_metadata_size as usize,
))
}
ptr_to_opt_slice(
self.0.member_metadata as *const u8,
self.0.member_metadata_size as usize,
)
}
}

/// Return the partition assignment of the member.
pub fn assignment(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_assignment.is_null() {
None
} else {
Some(slice::from_raw_parts::<u8>(
self.0.member_assignment as *const u8,
self.0.member_assignment_size as usize,
))
}
ptr_to_opt_slice(
self.0.member_assignment as *const u8,
self.0.member_assignment_size as usize,
)
}
}
}
Expand All @@ -85,7 +77,7 @@ impl GroupInfo {
/// Returns the members of the group.
pub fn members(&self) -> &[GroupMemberInfo] {
unsafe {
slice::from_raw_parts(
ptr_to_slice(
self.0.members as *const GroupMemberInfo,
self.0.member_cnt as usize,
)
Expand Down Expand Up @@ -149,8 +141,6 @@ impl GroupList {

/// Returns all the groups in the list.
pub fn groups(&self) -> &[GroupInfo] {
unsafe {
slice::from_raw_parts(self.0.groups as *const GroupInfo, self.0.group_cnt as usize)
}
unsafe { ptr_to_slice(self.0.groups as *const GroupInfo, self.0.group_cnt as usize) }
}
}
10 changes: 5 additions & 5 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl Headers for BorrowedHeaders {
Some(Header {
key: CStr::from_ptr(name_ptr).to_str().unwrap(),
value: (!value_ptr.is_null())
.then(|| util::ptr_to_slice(value_ptr, value_size)),
.then(|| util::ptr_to_slice(value_ptr as *const u8, value_size)),
})
}
}
Expand Down Expand Up @@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> {
type Headers = BorrowedHeaders;

fn key(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
unsafe { util::ptr_to_opt_slice(self.ptr.key as *const u8, self.ptr.key_len) }
}

fn payload(&self) -> Option<&[u8]> {
unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
unsafe { util::ptr_to_opt_slice(self.ptr.payload as *const u8, self.ptr.len) }
}

unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
util::ptr_to_opt_mut_slice(self.ptr.payload as *mut u8, self.ptr.len)
}

fn topic(&self) -> &str {
unsafe {
CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt))
.to_str()
.expect("Topic name is not valid UTF-8")
}
Expand Down
13 changes: 6 additions & 7 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Cluster metadata.
use std::ffi::CStr;
use std::slice;

use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::error::IsError;
use crate::util::{KafkaDrop, NativePtr};
use crate::util::{ptr_to_slice, KafkaDrop, NativePtr};

/// Broker metadata information.
pub struct MetadataBroker(RDKafkaMetadataBroker);
Expand Down Expand Up @@ -60,12 +59,12 @@ impl MetadataPartition {

/// Returns the broker IDs of the replicas.
pub fn replicas(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
unsafe { ptr_to_slice(self.0.replicas, self.0.replica_cnt as usize) }
}

/// Returns the broker IDs of the in-sync replicas.
pub fn isr(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
unsafe { ptr_to_slice(self.0.isrs, self.0.isr_cnt as usize) }
}
}

Expand All @@ -85,7 +84,7 @@ impl MetadataTopic {
/// Returns the partition metadata information for all the partitions.
pub fn partitions(&self) -> &[MetadataPartition] {
unsafe {
slice::from_raw_parts(
ptr_to_slice(
self.0.partitions as *const MetadataPartition,
self.0.partition_cnt as usize,
)
Expand Down Expand Up @@ -141,7 +140,7 @@ impl Metadata {
/// Returns the metadata information for all the brokers in the cluster.
pub fn brokers(&self) -> &[MetadataBroker] {
unsafe {
slice::from_raw_parts(
ptr_to_slice(
self.0.brokers as *const MetadataBroker,
self.0.broker_cnt as usize,
)
Expand All @@ -151,7 +150,7 @@ impl Metadata {
/// Returns the metadata information for all the topics in the cluster.
pub fn topics(&self) -> &[MetadataTopic] {
unsafe {
slice::from_raw_parts(
ptr_to_slice(
self.0.topics as *const MetadataTopic,
self.0.topic_cnt as usize,
)
Expand Down
9 changes: 2 additions & 7 deletions src/producer/base_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::slice;
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -67,7 +66,7 @@ use crate::producer::{
DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
};
use crate::topic_partition_list::TopicPartitionList;
use crate::util::{IntoOpaque, NativePtr, Timeout};
use crate::util::{ptr_to_opt_slice, IntoOpaque, NativePtr, Timeout};

pub use crate::message::DeliveryResult;

Expand Down Expand Up @@ -217,11 +216,7 @@ unsafe extern "C" fn partitioner_cb<Part: Partitioner, C: ProducerContext<Part>>

let is_partition_available = |p: i32| rdsys::rd_kafka_topic_partition_available(topic, p) == 1;

let key = if keydata.is_null() {
None
} else {
Some(slice::from_raw_parts(keydata as *const u8, keylen))
};
let key = ptr_to_opt_slice(keydata, keylen);

let producer_context = &mut *(rkt_opaque as *mut C);

Expand Down
10 changes: 5 additions & 5 deletions src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::slice;
use std::str;

use libc::c_void;
Expand Down Expand Up @@ -139,7 +138,8 @@ impl<'a> TopicPartitionListElem<'a> {

/// Returns the optional metadata associated with the entry.
pub fn metadata(&self) -> &str {
let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
let bytes =
unsafe { util::ptr_to_slice(self.ptr.metadata as *const u8, self.ptr.metadata_size) };
str::from_utf8(bytes).expect("Metadata is not UTF-8")
}

Expand Down Expand Up @@ -317,7 +317,7 @@ impl TopicPartitionList {

/// Sets all partitions in the list to the specified offset.
pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) };
for elem_ptr in slice {
let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
elem.set_offset(offset)?;
Expand All @@ -327,7 +327,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list.
pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
Expand All @@ -337,7 +337,7 @@ impl TopicPartitionList {

/// Returns all the elements of the list that belong to the specified topic.
pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let slice = unsafe { util::ptr_to_mut_slice(self.ptr.elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
Expand Down
25 changes: 16 additions & 9 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,39 @@ pub fn current_time_millis() -> i64 {

/// Converts a pointer to an array to an optional slice. If the pointer is null,
/// returns `None`.
pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const T, size: usize) -> Option<&'a [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts::<T>(ptr as *const T, size))
Some(slice::from_raw_parts(ptr, size))
}
}

pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(
ptr: *const c_void,
size: usize,
) -> Option<&'a mut [T]> {
pub(crate) unsafe fn ptr_to_opt_mut_slice<'a, T>(ptr: *mut T, size: usize) -> Option<&'a mut [T]> {
if ptr.is_null() {
None
} else {
Some(slice::from_raw_parts_mut::<T>(ptr as *mut T, size))
Some(slice::from_raw_parts_mut(ptr, size))
}
}

/// Converts a pointer to an array to a slice. If the pointer is null or the
/// size is zero, returns a zero-length slice..
pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const T, size: usize) -> &'a [T] {
if ptr.is_null() || size == 0 {
&[][..]
} else {
slice::from_raw_parts::<T>(ptr as *const T, size)
slice::from_raw_parts(ptr, size)
}
}

/// Converts a pointer to an array to a mutable slice. If the pointer is null
/// or the size is zero, returns a zero-length slice.
pub(crate) unsafe fn ptr_to_mut_slice<'a, T>(ptr: *mut T, size: usize) -> &'a mut [T] {
if ptr.is_null() || size == 0 {
&mut [][..]
} else {
slice::from_raw_parts_mut(ptr, size)
}
}

Expand Down

0 comments on commit 4470b95

Please sign in to comment.