diff --git a/lib/tlitest/test_tlog_rec_session.py b/lib/tlitest/test_tlog_rec_session.py index eba34d8..7c0fd14 100644 --- a/lib/tlitest/test_tlog_rec_session.py +++ b/lib/tlitest/test_tlog_rec_session.py @@ -4,7 +4,7 @@ import time import inspect from tempfile import mkdtemp -from subprocess import Popen, PIPE, STDOUT +from subprocess import Popen, PIPE, STDOUT, run import pytest from misc import check_recording, mklogfile, mkcfgfile, \ @@ -22,6 +22,25 @@ def utempter_enabled(): return 'libutempter.so' in stdout_data +@pytest.fixture +def gen_ssh_key(request): + def _del_ssh_key(user): + # Remove ssh key to ensure no tests affected later + run(f'sed -i "/stdin_test/d" ~{user}/.ssh/authorized_keys', shell=True) + run(f'rm -rf ~/.ssh/id_rsa_{user}*', shell=True) + + def _gen_ssh_key(user): + # Generate SSH Key for test + run(f'rm -rf ~/.ssh/id_rsa_{user}*', shell=True) + run(f'ssh-keygen -t rsa -b 2048 -N "" -C stdin_test -f ~/.ssh/id_rsa_{user}', shell=True) + run(f"mkdir -p ~{user}/.ssh", shell=True) + run(f"cat ~/.ssh/id_rsa_{user}.pub >> ~{user}/.ssh/authorized_keys", shell=True) + run(f"chown -R {user}:{user} ~{user}/.ssh", shell=True) + request.addfinalizer(lambda: _del_ssh_key(user)) + + return _gen_ssh_key + + class TestTlogRecSession: """ Test tlog-rec-session functionality """ user = 'tlitestlocaluser2' @@ -211,16 +230,21 @@ def test_session_record_with_different_shell(self): check_recording(shell, '/usr/bin/tcsh', logfile) shell.sendline('exit') - def test_session_record_pipe_io_stdin(self): + def test_session_record_pipe_io_stdin(self, gen_ssh_key): """ Pipe I/O through stdin """ text_in_stdio = 'print("hello world")\n' text_out = "hello world" - p = Popen(['sshpass', '-p', 'Secret123', 'ssh', '-o', - 'StrictHostKeyChecking=no', - 'tlitestlocaluser2@localhost', 'python3'], - stdout=PIPE, stdin=PIPE, stderr=PIPE, encoding='utf8') + + sessionclass = TlogRecSessionConfig(writer="syslog") + sessionclass.generate_config(SYSTEM_TLOG_REC_SESSION_CONF) + + gen_ssh_key(self.user) + p = Popen(['ssh', '-i', f'~/.ssh/id_rsa_{self.user}', + '-o', 'StrictHostKeyChecking=no', + f'{self.user}@localhost', 'python3'], + stdout=PIPE, stdin=PIPE, stderr=PIPE, encoding='utf8') stdout_data = p.communicate(input=text_in_stdio)[0] assert text_out in stdout_data