1 # -*- coding: utf-8 -*-
3 from pytest import raises
4 from sqlparse import parse
6 from sqlconvert.print_tokens import tlist2str
7 from sqlconvert.process_mysql import remove_directive_tokens, \
8 is_directive_statement, requote_names, unescape_strings, \
10 from sqlconvert.process_tokens import escape_strings, StatementGrouper
14 parsed = parse("insert into test (1, 'тест')", 'utf-8')[0]
15 query = tlist2str(parsed).encode('utf-8')
17 u"INSERT INTO test (1, 'тест')".encode('utf-8')
21 parsed = parse(u"insert into test (1, 'тест')")[0]
22 query = tlist2str(parsed)
23 assert query, u"INSERT INTO test (1 == 'тест')"
27 parsed = parse("select /*! test */ * from /* test */ `T`")[0]
28 remove_directive_tokens(parsed)
29 query = tlist2str(parsed)
30 assert query == u'SELECT * FROM /* test */ `T`'
33 def test_directive_statement():
34 parsed = parse("/*! test */ test ;")[0]
35 assert not is_directive_statement(parsed)
36 parsed = parse("/*! test */ ;")[0]
37 assert is_directive_statement(parsed)
41 parsed = parse("select * from `T`")[0]
43 query = tlist2str(parsed)
44 assert query == u'SELECT * FROM "T"'
47 def test_unescape_string():
48 parsed = parse("insert into test values ('\"te\\'st\\\"\\n')")[0]
49 unescape_strings(parsed)
50 query = tlist2str(parsed)
51 assert query == u"INSERT INTO test VALUES ('\"te'st\"\n')"
54 def test_escape_string_postgres():
55 parsed = parse("insert into test values ('\"te\\'st\\\"\\n')")[0]
56 unescape_strings(parsed)
57 escape_strings(parsed, 'postgres')
58 query = tlist2str(parsed)
59 assert query == u"INSERT INTO test VALUES (E'\"te''st\"\\n')"
62 def test_escape_string_sqlite():
63 parsed = parse("insert into test values ('\"te\\'st\\\"\\n')")[0]
64 unescape_strings(parsed)
65 escape_strings(parsed, 'sqlite')
66 query = tlist2str(parsed)
67 assert query == u"INSERT INTO test VALUES ('\"te''st\"\n')"
71 parsed = parse("select /*! test */ * from /* test */ `T`")[0]
72 process_statement(parsed)
73 query = tlist2str(parsed)
74 assert query == u'SELECT * FROM /* test */ "T"'
77 def test_incomplete():
78 grouper = StatementGrouper()
79 grouper.process_line("select * from `T`")
80 assert not grouper.statements
81 assert len(grouper.statements) == 0
82 raises(ValueError, grouper.close)
85 def test_statements():
86 grouper = StatementGrouper()
87 grouper.process_line("select * from T;")
88 assert grouper.statements
89 assert len(grouper.statements) == 1
90 for statement in grouper.get_statements():
91 query = tlist2str(statement)
92 assert query == 'SELECT * FROM T;'
93 assert len(grouper.statements) == 0
94 assert grouper.close() is None