Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy_utils/functions/mock.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

53 statements  

1import contextlib 

2import datetime 

3import inspect 

4import io 

5import re 

6 

7import sqlalchemy as sa 

8 

9 

10def create_mock_engine(bind, stream=None): 

11 """Create a mock SQLAlchemy engine from the passed engine or bind URL. 

12 

13 :param bind: A SQLAlchemy engine or bind URL to mock. 

14 :param stream: Render all DDL operations to the stream. 

15 """ 

16 

17 if not isinstance(bind, str): 

18 bind_url = str(bind.url) 

19 

20 else: 

21 bind_url = bind 

22 

23 if stream is not None: 

24 

25 def dump(sql, *args, **kwargs): 

26 class Compiler(type(sql._compiler(engine.dialect))): 

27 def visit_bindparam(self, bindparam, *args, **kwargs): 

28 return self.render_literal_value(bindparam.value, bindparam.type) 

29 

30 def render_literal_value(self, value, type_): 

31 if isinstance(value, int): 

32 return str(value) 

33 

34 elif isinstance(value, (datetime.date, datetime.datetime)): 

35 return "'%s'" % value 

36 

37 return super().render_literal_value(value, type_) 

38 

39 text = str(Compiler(engine.dialect, sql).process(sql)) 

40 text = re.sub(r'\n+', '\n', text) 

41 text = text.strip('\n').strip() 

42 

43 stream.write('\n%s;' % text) 

44 

45 else: 

46 

47 def dump(*args, **kw): 

48 return None 

49 

50 try: 

51 engine = sa.create_mock_engine(bind_url, executor=dump) 

52 except AttributeError: # SQLAlchemy <1.4 

53 engine = sa.create_engine(bind_url, strategy='mock', executor=dump) 

54 return engine 

55 

56 

57@contextlib.contextmanager 

58def mock_engine(engine, stream=None): 

59 """Mocks out the engine specified in the passed bind expression. 

60 

61 Note this function is meant for convenience and protected usage. Do NOT 

62 blindly pass user input to this function as it uses exec. 

63 

64 :param engine: A python expression that represents the engine to mock. 

65 :param stream: Render all DDL operations to the stream. 

66 """ 

67 

68 # Create a stream if not present. 

69 

70 if stream is None: 

71 stream = io.StringIO() 

72 

73 # Navigate the stack and find the calling frame that allows the 

74 # expression to execute. 

75 

76 for frame in inspect.stack()[1:]: 

77 try: 

78 frame = frame[0] 

79 expression = '__target = %s' % engine 

80 exec(expression, frame.f_globals, frame.f_locals) 

81 target = frame.f_locals['__target'] 

82 break 

83 

84 except Exception: 

85 pass 

86 

87 else: 

88 raise ValueError('Not a valid python expression', engine) 

89 

90 # Evaluate the expression and get the target engine. 

91 

92 frame.f_locals['__mock'] = create_mock_engine(target, stream) 

93 

94 # Replace the target with our mock. 

95 

96 exec('%s = __mock' % engine, frame.f_globals, frame.f_locals) 

97 

98 # Give control back. 

99 

100 yield stream 

101 

102 # Put the target engine back. 

103 

104 frame.f_locals['__target'] = target 

105 exec('%s = __target' % engine, frame.f_globals, frame.f_locals) 

106 exec('del __target', frame.f_globals, frame.f_locals) 

107 exec('del __mock', frame.f_globals, frame.f_locals)