Skip to content

Commit

Permalink
spliting stdata into chunks on the filesystem
Browse files Browse the repository at this point in the history
with all the logic to load them back and reconstruct the .stdata file
  • Loading branch information
Frix-x committed Sep 15, 2024
1 parent eed67f1 commit b8832f8
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 44 deletions.
1 change: 1 addition & 0 deletions shaketune/commands/axes_map_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,6 @@ def axes_map_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
ConsoleOutput.print('This may take some time (1-3min)')
creator = st_process.get_graph_creator()
creator.configure(accel, SEGMENT_LENGTH)
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
5 changes: 2 additions & 3 deletions shaketune/commands/axes_shaper_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,12 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
else:
input_shaper = None

measurements_manager = MeasurementsManager()

# Filter axis configurations based on user input, assuming 'axis_input' can be 'x', 'y', 'all' (that means 'x' and 'y')
filtered_config = [
a for a in AXIS_CONFIG if a['axis'] == axis_input or (axis_input == 'all' and a['axis'] in ('x', 'y'))
]
for config in filtered_config:
measurements_manager.clear_measurements() # Clear the measurements in each iteration of the loop
measurements_manager = MeasurementsManager()

# First we need to find the accelerometer chip suited for the axis
accel_chip = Accelerometer.find_axis_accelerometer(printer, config['axis'])
Expand All @@ -117,6 +115,7 @@ def axes_shaper_calibration(gcmd, config, st_process: ShakeTuneProcess) -> None:
# And finally generate the graph for each measured axis
ConsoleOutput.print(f'{config["axis"].upper()} axis frequency profile generation...')
ConsoleOutput.print('This may take some time (1-3min)')
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
toolhead.dwell(1)
Expand Down
1 change: 1 addition & 0 deletions shaketune/commands/compare_belts_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,6 @@ def compare_belts_responses(gcmd, config, st_process: ShakeTuneProcess) -> None:
# Run post-processing
ConsoleOutput.print('Belts comparative frequency profile generation...')
ConsoleOutput.print('This may take some time (1-3min)')
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
5 changes: 5 additions & 0 deletions shaketune/commands/create_vibrations_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ def create_vibrations_profile(gcmd, config, st_process: ShakeTuneProcess) -> Non
accelerometer.stop_recording()
accelerometer.wait_for_samples()

# For this command, we need to wait for the data transfers after finishing each of
# the measurements as there is a high probability to have a lot of measurements in
# the measurement manager and that chunks are written into the /tmp filesystem folder
measurements_manager.wait_for_data_transfers(printer.get_reactor())

toolhead.dwell(0.3)
toolhead.wait_moves()

Expand Down
1 change: 1 addition & 0 deletions shaketune/commands/excitate_axis_at_freq.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,6 @@ def excitate_axis_at_freq(gcmd, config, st_process: ShakeTuneProcess) -> None:

creator = st_process.get_graph_creator()
creator.configure(freq, duration, accel_per_hz)
measurements_manager.wait_for_data_transfers(printer.get_reactor())
st_process.run(measurements_manager)
st_process.wait_for_completion()
154 changes: 113 additions & 41 deletions shaketune/helpers/accelerometer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import pickle
import time
import uuid
from multiprocessing import Process
from pathlib import Path
from typing import List, Tuple, TypedDict
Expand All @@ -26,6 +27,8 @@
Sample = Tuple[float, float, float, float]
SamplesList = List[Sample]

CHUNK_SIZE = 15 # Maximum number of measurements to keep in memory at once


class Measurement(TypedDict):
name: str
Expand All @@ -35,30 +38,73 @@ class Measurement(TypedDict):
class MeasurementsManager:
def __init__(self):
self.measurements: List[Measurement] = []
self._write_process = None

def add_measurement(self, name: str, samples: SamplesList = None):
samples = samples if samples is not None else []
self.measurements.append({'name': name, 'samples': samples})

def get_measurements(self) -> List[Measurement]:
return self.measurements
self._uuid = str(uuid.uuid4())[:8]
self._temp_dir = Path(f'/tmp/shaketune_{self._uuid}')
self._temp_dir.mkdir(parents=True, exist_ok=True)
self._chunk_files = []
self._write_processes = []

def clear_measurements(self, keep_last: bool = False):
if keep_last:
self.measurements = [self.measurements[-1]]
else:
self.measurements = []

def append_samples_to_last_measurement(self, additional_samples: SamplesList):
try:
self.measurements[-1]['samples'].extend(additional_samples)
except IndexError as err:
raise ValueError('no measurements available to append samples to.') from err

def clear_measurements(self):
self.measurements = []
def add_measurement(self, name: str, samples: SamplesList = None):
samples = samples if samples is not None else []
self.measurements.append({'name': name, 'samples': samples})
if len(self.measurements) > CHUNK_SIZE:
self._save_chunk()

def _save_chunk(self):
# Save the measurements to the chunk file. We keep the last measurement
# in memory to be able to append new samples to it later if needed
chunk_filename = self._temp_dir / f'{self._uuid}_{len(self._chunk_files)}.stchunk'
process = Process(target=self._save_to_file, args=(chunk_filename, self.measurements[:-1].copy()))
process.daemon = False
process.start()
self._write_processes.append(process)
self._chunk_files.append(chunk_filename)
self.clear_measurements(keep_last=True)

def save_stdata(self, filename: Path):
self._write_process = Process(target=self._save_to_file, args=(filename,))
self._write_process.daemon = False
self._write_process.start()
process = Process(target=self._reassemble_chunks, args=(filename,))
process.daemon = False
process.start()
self._write_processes.append(process)

def _save_to_file(self, filename: Path):
def _reassemble_chunks(self, filename: Path):
try:
os.nice(19)
except Exception:
pass # Ignore errors as it's not critical
try:
all_measurements = []
for chunk_file in self._chunk_files:
chunk_measurements = self._load_measurements_from_file(chunk_file)
all_measurements.extend(chunk_measurements)
os.remove(chunk_file) # Remove the chunk file after reading

# Include any remaining measurements in memory
if self.measurements:
all_measurements.extend(self.measurements)

# Save all measurements to the final .stdata file
self._save_to_file(filename, all_measurements)

# Clean up
self.clear_measurements()
self._chunk_files = []
except Exception as e:
ConsoleOutput.print(f'Warning: unable to assemble chunks into {filename}: {e}')

def _save_to_file(self, filename: Path, measurements: List[Measurement]):
try:
os.nice(19)
except Exception:
Expand All @@ -67,15 +113,54 @@ def _save_to_file(self, filename: Path):
with open(filename, 'wb') as f:
cctx = zstd.ZstdCompressor(level=3)
with cctx.stream_writer(f) as compressor:
pickle.dump(self.measurements, compressor)
pickle.dump(measurements, compressor)
except Exception as e:
ConsoleOutput.print(f'Warning: unable to save the data to {filename}: {e}')

def wait_for_data_transfers(self, k_reactor, timeout: int = 30):
if not self._write_processes:
return # No file write is pending

eventtime = k_reactor.monotonic()
endtime = eventtime + timeout
complete = False

while eventtime < endtime:
eventtime = k_reactor.pause(eventtime + 0.05)
if all(not p.is_alive() for p in self._write_processes):
complete = True
break

if not complete:
raise TimeoutError(
'Shake&Tune was unable to write the accelerometer data on the fylesystem. '
'This might be due to a slow, busy or full SD card.'
)

self._write_processes = []

def _load_measurements_from_file(self, filename: Path) -> List[Measurement]:
try:
with open(filename, 'rb') as f:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(f) as decompressor:
measurements = pickle.load(decompressor)
return measurements
except Exception as e:
ConsoleOutput.print(f'Warning: unable to save the measurements to {filename}: {e}')
ConsoleOutput.print(f'Warning: unable to load measurements from {filename}: {e}')
return []

def get_measurements(self) -> List[Measurement]:
all_measurements = []
for chunk_file in self._chunk_files:
chunk_measurements = self._load_measurements_from_file(chunk_file)
all_measurements.extend(chunk_measurements)
all_measurements.extend(self.measurements) # Include any remaining measurements in memory

return all_measurements

def load_from_stdata(self, filename: Path) -> List[Measurement]:
with open(filename, 'rb') as f:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(f) as decompressor:
self.measurements = pickle.load(decompressor)
self.measurements = self._load_measurements_from_file(filename)
return self.measurements

def load_from_csvs(self, klipper_CSVs: List[Path]) -> List[Measurement]:
Expand Down Expand Up @@ -124,27 +209,14 @@ def load_from_csvs(self, klipper_CSVs: List[Path]) -> List[Measurement]:

return self.measurements

def wait_for_file_writes(self, k_reactor, timeout: int = 30):
if self._write_process is None:
return # No file write is pending

eventtime = k_reactor.monotonic()
endtime = eventtime + timeout
complete = False

while eventtime < endtime:
eventtime = k_reactor.pause(eventtime + 0.05)
if not self._write_process.is_alive():
complete = True
break

if not complete:
raise TimeoutError(
'Shake&Tune was unable to write the accelerometer data into the .STDATA file. '
'This might be due to a slow SD card or a busy or full filesystem.'
)

self._write_process = None
def __del__(self):
try:
if self._temp_dir.exists():
for chunk_file in self._temp_dir.glob('*.stchunk'):
chunk_file.unlink()
self._temp_dir.rmdir()
except Exception:
pass # Ignore errors during cleanup


class Accelerometer:
Expand Down

0 comments on commit b8832f8

Please sign in to comment.