bebraw
1/14/2010 - 1:39 PM

scenario_tester.py

from placidity.scenario_tester import EllipsisOutput, Input, InputError, \
NotRunningError, MatchError, Meta, Output, OutputError, RunningError, \
ScenarioTester
from py.test import raises

class AbstractApplication:
    def run(self):
        try:
            while True:
                input = self.input()

                result = self.interpret(input)

                if result:
                    self.output(result)
        except SystemExit:
            pass

    def interpret(self, input):
        pass

class TestScenarioTester:
    def test_passing_test(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                if input == 'a':
                    return 4

        scenario = '''
>>> a = 4
>>> a
4
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 3
        self.assert_line(lines, 1, Input, 'a = 4')
        self.assert_line(lines, 2, Input, 'a')
        self.assert_line(lines, 3, Output, '4')

        # this should not trigger any asserts
        scenario_tester.test(scenario)

    def test_running(self):
        class Application(AbstractApplication):
            pass
        
        scenario = '''
>>> a = 4
--- running
>>> b = 5
--- running
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 4
        self.assert_line(lines, 1, Input, 'a = 4')
        self.assert_line(lines, 2, Meta, 'running')
        self.assert_line(lines, 3, Input, 'b = 5')
        self.assert_line(lines, 4, Meta, 'running')

        # this should not trigger any asserts
        scenario_tester.test(scenario)

    def test_running_fail(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                if input == 'quit':
                    raise SystemExit

        scenario = '''
>>> a = 4
--- running
>>> quit
--- running
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 4
        self.assert_line(lines, 1, Input, 'a = 4')
        self.assert_line(lines, 2, Meta, 'running')
        self.assert_line(lines, 3, Input, 'quit')
        self.assert_line(lines, 4, Meta, 'running')

        raises(NotRunningError, scenario_tester.test, scenario)

    def test_not_running(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                if input == 'quit':
                    raise SystemExit

        scenario = '''
>>> quit
--- not running
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 2
        self.assert_line(lines, 1, Input, 'quit')
        self.assert_line(lines, 2, Meta, 'not running')

        # this should not trigger any asserts
        scenario_tester.test(scenario)

    def test_not_running_fail(self):
        class Application(AbstractApplication):
            pass

        scenario = '''
>>> a = 5
--- not running
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 2
        self.assert_line(lines, 1, Input, 'a = 5')
        self.assert_line(lines, 2, Meta, 'not running')

        raises(RunningError, scenario_tester.test, scenario)

    def test_restart(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                if input == 'quit':
                    raise SystemExit

        scenario = '''
>>> quit
--- not running
--- restart
--- running
--- restart
--- running
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 6
        self.assert_line(lines, 1, Input, 'quit')
        self.assert_line(lines, 2, Meta, 'not running')
        self.assert_line(lines, 3, Meta, 'restart')
        self.assert_line(lines, 4, Meta, 'running')
        self.assert_line(lines, 5, Meta, 'restart')
        self.assert_line(lines, 6, Meta, 'running')

        # this should not trigger any asserts
        scenario_tester.test(scenario)

    def test_ellipsis(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                if input == 'a':
                    return 5

        scenario = '''
>>> a = 5
>>> a
...
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 3
        self.assert_line(lines, 1, Input, 'a = 5')
        self.assert_line(lines, 2, Input, 'a')
        self.assert_line(lines, 3, EllipsisOutput, None)

        scenario_tester.test(scenario)

    def test_input_fail(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                pass

        scenario = '''
fail
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 1
        self.assert_line(lines, 1, Output, 'fail')

        raises(InputError, scenario_tester.test, scenario)

    def test_match_fail(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                if input == 'a':
                    return 42

        scenario = '''
>>> a = 4
>>> a
5
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 3
        self.assert_line(lines, 1, Input, 'a = 4')
        self.assert_line(lines, 2, Input, 'a')
        self.assert_line(lines, 3, Output, '5')

        raises(MatchError, scenario_tester.test, scenario)

    def test_output_fail(self):
        class Application(AbstractApplication):
            def interpret(self, input):
                return 42

        scenario = '''
>>> fail
>>> fail
'''

        scenario_tester = ScenarioTester(Application)
        scenario_tester.parse(scenario)
        lines = scenario_tester.lines

        assert len(lines) == 2
        self.assert_line(lines, 1, Input, 'fail')
        self.assert_line(lines, 2, Input, 'fail')

        raises(OutputError, scenario_tester.test, scenario)

    def assert_line(self, lines, line_number, line_type, line_content):
        line = lines[line_number - 1]
        assert isinstance(line, line_type)
        assert line.content == line_content
import types
from collections import deque

class InputError(Exception):
    pass

class NotRunningError(Exception):
    pass

class MatchError(Exception):
    pass

class OutputError(Exception):
    pass

class RunningError(Exception):
    pass

class Line:
    def __init__(self, line):
        self.content = line

    def check_input(self, app):
        raise InputError, 'Expected input but got output instead!' + \
            ' Failed at line "%s".' % self.content

    def check_output(self, result):
        raise OutputError, 'Expected output but got input instead!' + \
            ' Failed at line "%s". Result: %s.' % (self.content, result)

class PrefixLine(Line):
    def __init__(self, line):
        self.content = line.strip(self.prefix)

    @classmethod
    def matches(cls, line):
        return line.startswith(cls.prefix)

class Input(PrefixLine):
    prefix = '>>> '

    def check_input(self, app):
        return self.content

class Meta(PrefixLine):
    prefix = '--- '

    def check_input(self, app):
        def not_running():
            if app.running:
                raise RunningError, 'The application was expected to be ' + \
                    'halted but it was running instead!'

        def running():
            if not app.running:
                raise NotRunningError, 'The application was expected to ' + \
                    'be running but it was halted instead!'

        def restart():
            app.running = True

        {'not running': not_running, 'running': running,
            'restart': restart}[self.content]()

class Output(Line):
    @classmethod
    def matches(cls, line):
        return True

    def check_output(self, result):
        if self.content != str(result):
            raise MatchError, "Output content didn't match!" + \
                " Expected %s (%s) but got %s (%s) instead." \
                % (self.content, type(self.content), result, type(result))

class EllipsisOutput(Line):
    class Content:
        def __eq__(self, other):
            return True

    def __init__(self, line):
        self.content = self.Content()

    @classmethod
    def matches(cls, line):
        return line.startswith('...')

    def check_output(self, result):
        pass

class LineParser:
    line_types = (EllipsisOutput, Input, Meta, Output, )

    def parse(self, scenario):
        lines = deque()

        for line in scenario.split('\n'):
            parsed_line = self._parse_line(line)

            if parsed_line:
                lines.append(parsed_line)

        return lines

    def _parse_line(self, line):
        line = line.strip()

        if len(line) == 0:
            return

        for line_type in self.line_types:
            if line_type.matches(line):
                return line_type(line)

class ScenarioTester:
    def __init__(self, app_class):
        self._set_hooks(app_class)

    def test(self, scenario):
        self.parse(scenario)
        self.app.running = True

        while len(self.lines) > 0:
            self.app.run()
            self.app.running = False

    def parse(self, scenario):
        line_parser = LineParser()
        self.lines = line_parser.parse(scenario)

    def _set_hooks(self, app_class):
        self.app = app_class()

        def input(app):
            if len(self.lines) == 0:
                raise SystemExit

            current_line = self.lines.popleft()
            
            return current_line.check_input(app)

        self.app.input = types.MethodType(input, self.app, app_class)

        def output(app, result):
            current_line = self.lines.popleft()

            return current_line.check_output(result)

        self.app.output = types.MethodType(output, self.app, app_class)