Skip to content

Commit

Permalink
fix device propogation for noise and add noise tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jtigue-bdai committed Oct 7, 2024
1 parent 225a896 commit bf3cc4e
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,21 @@


def constant_noise(data: torch.Tensor, cfg: noise_cfg.ConstantNoiseCfg) -> torch.Tensor:
"""Constant noise."""
"""Applies a constant noise bias to a given data set.
Args:
data: The unmodified data set to apply noise to.
cfg: The configuration parameters for constant noise.
Returns:
The data modified by the noise parameters provided.
"""

# fix tensor device for bias on first call and update config parameters
if isinstance(cfg.bias,torch.Tensor):
if cfg.bias.device is not data.device:
cfg.bias = cfg.bias.to(data.device,)

if cfg.operation == "add":
return data + cfg.bias
elif cfg.operation == "scale":
Expand All @@ -30,7 +44,25 @@ def constant_noise(data: torch.Tensor, cfg: noise_cfg.ConstantNoiseCfg) -> torch


def uniform_noise(data: torch.Tensor, cfg: noise_cfg.UniformNoiseCfg) -> torch.Tensor:
"""Uniform noise."""
"""Applies a uniform noise to a given data set.
Args:
data: The unmodified data set to apply noise to.
cfg: The configuration parameters for uniform noise.
Returns:
The data modified by the noise parameters provided.
"""

# fix tensor device for n_max on first call and update config parameters
if isinstance(cfg.n_max,torch.Tensor):
if cfg.n_max.device is not data.device:
cfg.n_max = cfg.n_max.to(data.device)
# fix tensor device for n_min on first call and update config parameters
if isinstance(cfg.n_min,torch.Tensor):
if cfg.n_min.device is not data.device:
cfg.n_min = cfg.n_min.to(data.device)

if cfg.operation == "add":
return data + torch.rand_like(data) * (cfg.n_max - cfg.n_min) + cfg.n_min
elif cfg.operation == "scale":
Expand All @@ -42,7 +74,25 @@ def uniform_noise(data: torch.Tensor, cfg: noise_cfg.UniformNoiseCfg) -> torch.T


def gaussian_noise(data: torch.Tensor, cfg: noise_cfg.GaussianNoiseCfg) -> torch.Tensor:
"""Gaussian noise."""
"""Applies a gaussian noise to a given data set.
Args:
data: The unmodified data set to apply noise to.
cfg: The configuration parameters for gaussian noise.
Returns:
The data modified by the noise parameters provided.
"""

# fix tensor device for mean on first call and update config parameters
if isinstance(cfg.mean,torch.Tensor):
if cfg.mean.device is not data.device:
cfg.mean = cfg.mean.to(data.device)
# fix tensor device for std on first call and update config parameters
if isinstance(cfg.std,torch.Tensor):
if cfg.std.device is not data.device:
cfg.std = cfg.std.to(data.device)

if cfg.operation == "add":
return data + cfg.mean + cfg.std * torch.randn_like(data)
elif cfg.operation == "scale":
Expand Down
124 changes: 124 additions & 0 deletions source/extensions/omni.isaac.lab/test/utils/test_noise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Launch Isaac Sim Simulator first."""

from omni.isaac.lab.app import AppLauncher, run_tests

# launch omniverse app
app_launcher = AppLauncher(headless=True)
simulation_app = app_launcher.app

"""Rest everything follows."""

import time
import torch
import unittest
from dataclasses import MISSING

import omni.isaac.lab.utils.noise as noise
from omni.isaac.lab.utils import configclass



class TestNoise(unittest.TestCase):
"""Test different noise implementations."""

def test_gaussian_noise(self):
"""Test guassian_noise function."""

for device in ["cpu","cuda"]:
for noise_device in ["cpu","cuda"]:
for op in ["add","scale","abs"]:
with self.subTest(device=device, noise_device=noise_device, operation=op):
# create random data set
data = torch.rand(10000, 3, device=device)
# define standard deviation and mean
std = torch.tensor([0.1,0.2,0.3],device=noise_device)
mean = torch.tensor([0.4,0.5,0.6],device=noise_device)
# create noise config
noise_cfg = noise.GaussianNoiseCfg(std=std,
mean=mean,
operation=op)

for i in range(10):
# apply noise
noisy_data = noise_cfg.func(data,cfg=noise_cfg)
# calculate resulting noise compared to original data set
if op=="add":
std_result, mean_result = torch.std_mean(noisy_data-data,dim=0)
elif op=="scale":
std_result, mean_result = torch.std_mean(noisy_data/data,dim=0)
elif op=="abs":
std_result, mean_result = torch.std_mean(noisy_data,dim=0)

self.assertTrue(noise_cfg.mean.device,device)
self.assertTrue(noise_cfg.std.device,device)
torch.testing.assert_close(noise_cfg.std,std_result,atol=1e-2,rtol=1e-2)
torch.testing.assert_close(noise_cfg.mean,mean_result,atol=1e-2,rtol=1e-2)


def test_uniform_noise(self):
"""Test uniform_noise function."""
for device in ["cpu","cuda"]:
for noise_device in ["cpu","cuda"]:
for op in ["add","scale","abs"]:
with self.subTest(device=device, noise_device=noise_device,operation=op):
# create random data set
data = torch.rand(10000, 3, device=device)
# define uniform minimum and maximum
n_min = torch.tensor([0.1,0.2,0.3],device=noise_device)
n_max = torch.tensor([0.4,0.5,0.6],device=noise_device)
# create noise config
noise_cfg = noise.UniformNoiseCfg(n_max=n_max, n_min=n_min,operation=op)

for i in range(10):
# apply noise
noisy_data = noise_cfg.func(data,cfg=noise_cfg)
# calculate resulting noise compared to original data set
if op=="add":
min_result, _ = torch.min(noisy_data-data,dim=0)
max_result, _ = torch.max(noisy_data-data,dim=0)
elif op=="scale":
min_result, _ = torch.min(torch.div(noisy_data,data),dim=0)
max_result, _ = torch.max(torch.div(noisy_data,data),dim=0)
elif op=="abs":
min_result, _ = torch.min(noisy_data,dim=0)
max_result, _ = torch.max(noisy_data,dim=0)

self.assertTrue(noise_cfg.n_min.device,device)
self.assertTrue(noise_cfg.n_max.device,device)
self.assertTrue(all(torch.le(noise_cfg.n_min, min_result).tolist()))
self.assertTrue(all(torch.ge(noise_cfg.n_max, max_result).tolist()))

def test_constant_noise(self):
"""Test constant_noise"""
for device in ["cpu","cuda"]:
for noise_device in ["cpu","cuda"]:
for op in ["add","scale","abs"]:
with self.subTest(device=device, noise_device=noise_device,operation=op):
# create random data set
data = torch.rand(10000, 3, device=device)
# define a bias
bias = torch.tensor([0.1,0.2,0.3],device=noise_device)
# create noise config
noise_cfg = noise.ConstantNoiseCfg(bias=bias, operation=op)

for i in range(10):
# apply noise
noisy_data = noise_cfg.func(data,cfg=noise_cfg)
# calculate resulting noise compared to original data set
if op=="add":
bias_result = noisy_data-data
elif op=="scale":
bias_result = noisy_data/data
elif op=="abs":
bias_result = noisy_data

self.assertTrue(noise_cfg.bias.device,device)
torch.testing.assert_close(noise_cfg.bias.repeat(data.shape[0],1),bias_result)

if __name__ == "__main__":
run_tests()

0 comments on commit bf3cc4e

Please sign in to comment.