2 from sqlparse.sql import Comment
3 from sqlparse import tokens as T
4 from .process_tokens import escape_strings
7 def _is_directive_token(token):
8 if isinstance(token, Comment):
9 subtokens = token.tokens
11 comment = subtokens[0]
12 if comment.ttype is T.Comment.Multiline and \
13 comment.value.startswith('/*!'):
18 def is_directive_statement(statement):
19 tokens = statement.tokens
20 if not _is_directive_token(tokens[0]):
22 if tokens[-1].ttype is not T.Punctuation or tokens[-1].value != ';':
24 for token in tokens[1:-1]:
25 if token.ttype not in (T.Newline, T.Whitespace):
30 def remove_directive_tokens(statement):
31 """Remove /*! directives */ from the first-level"""
33 for token in statement.tokens:
34 if _is_directive_token(token):
36 new_tokens.append(token)
37 statement.tokens = new_tokens
40 def requote_names(token_list):
41 """Remove backticks, quote non-lowercase identifiers"""
42 for token in token_list.flatten():
43 if token.ttype is T.Name:
45 if (value[0] == "`") and (value[-1] == "`"):
48 token.normalized = token.value = value
50 token.normalized = token.value = '"%s"' % value
53 def unescape_strings(token_list):
54 """Unescape strings"""
55 for token in token_list.flatten():
56 if token.ttype is T.String.Single:
69 value = value.replace(orig, repl)
70 token.normalized = token.value = value
73 def process_statement(statement, quoting_style='sqlite'):
74 remove_directive_tokens(statement)
75 requote_names(statement)
76 unescape_strings(statement)
77 escape_strings(statement, quoting_style)