2 from sqlparse.sql import Comment
3 from sqlobject.converters import sqlrepr
4 from sqlparse import parse
5 from sqlparse.compat import PY3
6 from sqlparse import tokens as T
9 def find_error(token_list):
11 for token in token_list.flatten():
12 if token.ttype is T.Error:
17 def is_comment_or_space(token):
18 return isinstance(token, Comment) or \
19 token.ttype in (T.Comment, T.Comment.Single, T.Comment.Multiline,
20 T.Newline, T.Whitespace)
23 def is_newline_statement(statement):
24 for token in statement.tokens[:]:
25 if token.ttype is not T.Newline:
30 def escape_strings(token_list, dbname):
32 for token in token_list.flatten():
33 if token.ttype is T.String.Single:
34 value = token.value[1:-1] # unquote by removing apostrophes
35 value = sqlrepr(value, dbname)
36 token.normalized = token.value = value
43 class StatementGrouper(object):
44 """Collect lines and reparse until the last statement is complete"""
46 def __init__(self, encoding=None):
49 self.encoding = encoding
51 def process_line(self, line):
52 self.lines.append(line)
55 def process_lines(self):
56 statements = parse(''.join(self.lines), encoding=self.encoding)
59 last_stmt = statements[-1]
60 for i in xrange(len(last_stmt.tokens) - 1, 0, -1):
61 token = last_stmt.tokens[i]
62 if is_comment_or_space(token):
64 if token.ttype is T.Punctuation and token.value == ';':
65 break # The last statement is complete
66 # The last statement is still incomplete - wait for the next line
69 self.statements = statements
71 def get_statements(self):
72 for stmt in self.statements:
80 tokens = parse(''.join(self.lines), encoding=self.encoding)
82 if not is_comment_or_space(token):
83 raise ValueError("Incomplete SQL statement: %s" %