Skip to content

Commit

Permalink
Enhance Datumaro data format stream importer performance (#1153)
Browse files Browse the repository at this point in the history
 - Ticket no. 120785
- Change streaming import logic with DatumPageMapper implemented in Rust

| Before | After |
| :-: | :-: |
|
![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/0a06ddc0-5256-45b4-af03-e9299b8e61b8)
|
![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/af76210b-8fb5-4b30-aec1-2b5a22856ef7)
|

Signed-off-by: Kim, Vinnam <[email protected]>
  • Loading branch information
vinnamkim authored Sep 22, 2023
1 parent 0e8312b commit 45958aa
Show file tree
Hide file tree
Showing 13 changed files with 659 additions and 311 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## \[Unreleased\]

### Enhancements
- Enhance Datumaro data format stream importer performance
(<https://github.com/openvinotoolkit/datumaro/pull/1153>)

## 15/09/2023 - Release 1.5.0
### New features
- Add tabular data import/export
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pyo3 = "0.19.2"
serde = { version = "1.0.180", features = ["derive"] }
serde_json = "1.0.104"
strum = { version = "0.25", features = ["derive"] }
tempfile = "3.8.0"
246 changes: 119 additions & 127 deletions rust/src/coco_page_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,83 @@ use std::{
use strum::EnumString;

use crate::{
page_mapper::{JsonPageMapper, ParsedJsonSection},
page_maps::{AnnPageMap, ImgPageMap, JsonDict},
utils::{invalid_data, parse_serde_json_value, read_skipping_ws},
utils::{convert_to_py_object, invalid_data, parse_serde_json_value, read_skipping_ws},
};
use pyo3::{prelude::*, types::PyList};
use std::{fs::File, io::BufReader, path::Path};

#[derive(EnumString, Debug)]
pub enum CocoJsonSection {
enum CocoJsonSection {
#[strum(ascii_case_insensitive)]
LICENSES(JsonDict),
#[strum(ascii_case_insensitive)]
INFO(JsonDict),
#[strum(ascii_case_insensitive)]
CATEGORIES(JsonDict),
#[strum(ascii_case_insensitive)]
IMAGES(ImgPageMap),
IMAGES(ImgPageMap<i64>),
#[strum(ascii_case_insensitive)]
ANNOTATIONS(AnnPageMap),
}

impl ParsedJsonSection for CocoJsonSection {
fn parse(
buf_key: String,
mut reader: impl Read + Seek,
) -> Result<Box<CocoJsonSection>, io::Error> {
match CocoJsonSection::from_str(buf_key.as_str()) {
Ok(curr_key) => {
while let Ok(c) = read_skipping_ws(&mut reader) {
if c == b':' {
break;
}
}
match curr_key {
CocoJsonSection::LICENSES(_) => {
let v = parse_serde_json_value(reader)?;
Ok(Box::new(CocoJsonSection::LICENSES(v)))
}
CocoJsonSection::INFO(_) => {
let v = parse_serde_json_value(reader)?;
Ok(Box::new(CocoJsonSection::INFO(v)))
}
CocoJsonSection::CATEGORIES(_) => {
let v = parse_serde_json_value(reader)?;
Ok(Box::new(CocoJsonSection::CATEGORIES(v)))
}
CocoJsonSection::IMAGES(_) => {
let v = ImgPageMap::from_reader(reader)?;
Ok(Box::new(CocoJsonSection::IMAGES(v)))
}
CocoJsonSection::ANNOTATIONS(_) => {
let v = AnnPageMap::from_reader(reader)?;
Ok(Box::new(CocoJsonSection::ANNOTATIONS(v)))
}
}
}
Err(e) => {
let cur_pos = reader.stream_position()?;
let msg = format!("Unknown key: {} at pos: {}", e, cur_pos);
Err(invalid_data(msg.as_str()))
}
}
}
}

#[derive(Debug)]
pub struct CocoPageMapper {
struct CocoPageMapperImpl {
licenses: JsonDict,
info: JsonDict,
categories: JsonDict,
images: ImgPageMap,
images: ImgPageMap<i64>,
annotations: AnnPageMap,
}

impl CocoPageMapper {
impl JsonPageMapper<CocoJsonSection> for CocoPageMapperImpl {}

impl CocoPageMapperImpl {
pub fn licenses(&self) -> &JsonDict {
return &self.licenses;
}
Expand All @@ -51,7 +100,7 @@ impl CocoPageMapper {
}
pub fn get_item_dict(
&self,
img_id: i64,
img_id: &i64,
mut reader: impl Read + Seek,
) -> Result<JsonDict, io::Error> {
self.images.get_dict(&mut reader, img_id)
Expand All @@ -63,7 +112,6 @@ impl CocoPageMapper {
) -> Result<Vec<JsonDict>, io::Error> {
self.annotations.get_anns(&mut reader, img_id)
}

pub fn new(mut reader: impl Read + Seek) -> Result<Self, io::Error> {
let sections = Self::parse_json(&mut reader)?;

Expand All @@ -74,7 +122,7 @@ impl CocoPageMapper {
let mut annotations = None;

for section in sections {
match section {
match *section {
CocoJsonSection::LICENSES(v) => {
licenses = Some(v);
}
Expand All @@ -100,141 +148,80 @@ impl CocoPageMapper {
let annotations =
annotations.ok_or(invalid_data("Cannot find the annotations section."))?;

Ok(CocoPageMapper {
Ok(CocoPageMapperImpl {
licenses,
info,
categories,
images,
annotations,
})
}
}

fn parse_json(mut reader: impl Read + Seek) -> Result<Vec<CocoJsonSection>, io::Error> {
let mut brace_level = 0;
let mut coco_json_sections = Vec::new();

while let Ok(c) = read_skipping_ws(&mut reader) {
match c {
b'{' => brace_level += 1,
b'"' => {
let mut buf_key = Vec::new();
while let Ok(c) = read_skipping_ws(&mut reader) {
if c == b'"' {
break;
}
buf_key.push(c);
}
match String::from_utf8(buf_key.clone()) {
Ok(key) => {
let section = Self::parse_section_from_key(key, &mut reader)?;
coco_json_sections.push(section);
}
Err(e) => {
let cur_pos = reader.stream_position()?;
let msg = format!(
"Section key buffer, {:?} is invalid at pos: {}. {}",
buf_key, cur_pos, e
);
let err = invalid_data(msg.as_str());
return Err(err);
}
}
}
b',' => {
continue;
}
b'}' => {
brace_level -= 1;
if brace_level == 0 {
break;
}
}
_ => {
let cur_pos = reader.stream_position()?;
let msg = format!("{} is invalid character at pos: {}", c, cur_pos);
let err = invalid_data(msg.as_str());
return Err(err);
}
}
}
Ok(coco_json_sections)
#[pyclass]
pub struct CocoPageMapper {
reader: BufReader<File>,
mapper: CocoPageMapperImpl,
}

#[pymethods]
impl CocoPageMapper {
#[new]
fn py_new(path: String) -> PyResult<Self> {
let file = File::open(Path::new(&path))?;
let mut reader = BufReader::new(file);
let mapper = CocoPageMapperImpl::new(&mut reader)?;

Ok(CocoPageMapper { reader, mapper })
}

fn parse_section_from_key(
buf_key: String,
mut reader: impl Read + Seek,
) -> Result<CocoJsonSection, io::Error> {
match CocoJsonSection::from_str(buf_key.as_str()) {
Ok(curr_key) => {
while let Ok(c) = read_skipping_ws(&mut reader) {
if c == b':' {
break;
}
}
match curr_key {
CocoJsonSection::LICENSES(_) => {
let v = parse_serde_json_value(reader)?;
Ok(CocoJsonSection::LICENSES(v))
}
CocoJsonSection::INFO(_) => {
let v = parse_serde_json_value(reader)?;
Ok(CocoJsonSection::INFO(v))
}
CocoJsonSection::CATEGORIES(_) => {
let v = parse_serde_json_value(reader)?;
Ok(CocoJsonSection::CATEGORIES(v))
}
CocoJsonSection::IMAGES(_) => {
let v = ImgPageMap::from_reader(reader)?;
Ok(CocoJsonSection::IMAGES(v))
}
CocoJsonSection::ANNOTATIONS(_) => {
let v = AnnPageMap::from_reader(reader)?;
Ok(CocoJsonSection::ANNOTATIONS(v))
}
}
}
Err(e) => {
let cur_pos = reader.stream_position()?;
let msg = format!("Unknown key: {} at pos: {}", e, cur_pos);
Err(invalid_data(msg.as_str()))
}
}
fn licenses(self_: PyRef<Self>) -> PyResult<PyObject> {
convert_to_py_object(self_.mapper.licenses(), self_.py())
}

fn info(self_: PyRef<Self>) -> PyResult<PyObject> {
convert_to_py_object(self_.mapper.info(), self_.py())
}

fn categories(self_: PyRef<Self>) -> PyResult<PyObject> {
convert_to_py_object(self_.mapper.categories(), self_.py())
}

fn get_item_dict(&mut self, py: Python<'_>, img_id: i64) -> PyResult<PyObject> {
let item_dict = self.mapper.get_item_dict(&img_id, &mut self.reader)?;
Ok(convert_to_py_object(&item_dict, py)?)
}

fn get_anns_dict(&mut self, py: Python<'_>, img_id: i64) -> PyResult<PyObject> {
let anns_list = PyList::new(
py,
self.mapper
.get_anns_dict(img_id, &mut self.reader)?
.iter()
.map(|child| convert_to_py_object(child, py).unwrap()),
);
Ok(anns_list.into())
}

fn get_img_ids(&self) -> Vec<i64> {
self.mapper.get_img_ids().to_owned()
}

fn __len__(&self) -> PyResult<usize> {
Ok(self.mapper.get_img_ids().len())
}
}

#[cfg(test)]
mod tests {
use std::{
env::temp_dir,
fs::{File, OpenOptions},
io::{BufReader, Write},
};

use super::*;

fn prepare(example: &str) -> (BufReader<File>, CocoPageMapper) {
let filepath = temp_dir().join("tmp.json");

let mut f = OpenOptions::new()
.read(false)
.write(true)
.create(true)
.open(&filepath)
.expect("cannot open file");
let _ = f.write_all(example.as_bytes());
let f = File::open(&filepath).expect("cannot open file");
let mut reader = BufReader::new(f);
let coco_page_mapper = CocoPageMapper::new(&mut reader).unwrap();

(reader, coco_page_mapper)
}
use crate::test_helpers::prepare_reader;

#[test]
fn test_instance() {
const EXAMPLE: &str = r#"
{
"licenses":[{"name":"","id":0,"url":""}],
"licenses":[{"name":"test_instance()","id":0,"url":""}],
"info":{"contributor":"","date_created":"","description":"","url":"","version":"","year":""},
"categories":[
{"id":1,"name":"a","supercategory":""},
Expand All @@ -254,12 +241,15 @@ mod tests {
]
}"#;

let (mut reader, coco_page_mapper) = prepare(EXAMPLE);
let (tempfile, mut reader) = prepare_reader(EXAMPLE);
let coco_page_mapper = CocoPageMapperImpl::new(&mut reader).unwrap();

println!("{:?}", coco_page_mapper);

for img_id in [5, 6] {
let item = coco_page_mapper.get_item_dict(img_id, &mut reader).unwrap();
let item = coco_page_mapper
.get_item_dict(&img_id, &mut reader)
.unwrap();

assert_eq!(item["id"].as_i64(), Some(img_id));

Expand All @@ -278,7 +268,8 @@ mod tests {
{"licenses": [{"name": "", "id": 0, "url": ""}], "info": {"contributor": "", "date_created": "", "description": "", "url": "", "version": "", "year": ""}, "categories": [], "images": [{"id": 1, "width": 2, "height": 4, "file_name": "1.jpg", "license": 0, "flickr_url": "", "coco_url": "", "date_captured": 0}], "annotations": []}
"#;

let (mut reader, coco_page_mapper) = prepare(EXAMPLE);
let (tempfile, mut reader) = prepare_reader(EXAMPLE);
let coco_page_mapper = CocoPageMapperImpl::new(&mut reader).unwrap();

println!("{:?}", coco_page_mapper);
}
Expand All @@ -289,7 +280,8 @@ mod tests {
{"licenses":[{"name":"","id":0,"url":""}],"info":{"contributor":"","date_created":"","description":"","url":"","version":"","year":""},"categories":[{"id":1,"name":"0","supercategory":"","isthing":0},{"id":2,"name":"1","supercategory":"","isthing":0},{"id":3,"name":"2","supercategory":"","isthing":0},{"id":4,"name":"3","supercategory":"","isthing":0},{"id":5,"name":"4","supercategory":"","isthing":0},{"id":6,"name":"5","supercategory":"","isthing":0},{"id":7,"name":"6","supercategory":"","isthing":0},{"id":8,"name":"7","supercategory":"","isthing":0},{"id":9,"name":"8","supercategory":"","isthing":0},{"id":10,"name":"9","supercategory":"","isthing":0}],"images":[{"id":1,"width":4,"height":4,"file_name":"1.jpg","license":0,"flickr_url":"","coco_url":"","date_captured":0}],"annotations":[{"image_id":1,"file_name":"1.png","segments_info":[{"id":3,"category_id":5,"area":5.0,"bbox":[1.0,0.0,2.0,2.0],"iscrowd":0}]}]}
"#;

let (mut reader, coco_page_mapper) = prepare(EXAMPLE);
let (tempfile, mut reader) = prepare_reader(EXAMPLE);
let coco_page_mapper = CocoPageMapperImpl::new(&mut reader).unwrap();

println!("{:?}", coco_page_mapper);
}
Expand Down
Loading

0 comments on commit 45958aa

Please sign in to comment.