+from sqlobject.converters import sqlrepr
from sqlparse import parse
from sqlparse.compat import PY3
-from sqlparse.tokens import Error, Punctuation, Comment, Newline, Whitespace
+from sqlparse import tokens as T
def find_error(token_list):
"""Find an error"""
for token in token_list.flatten():
- if token.ttype is Error:
+ if token.ttype is T.Error:
return True
return False
+def is_newline_statement(statement):
+ for token in statement.tokens[:]:
+ if token.ttype is not T.Newline:
+ return False
+ return True
+
+
+def escape_strings(token_list, dbname):
+ """Escape strings"""
+ for token in token_list.flatten():
+ if token.ttype is T.String.Single:
+ value = token.value[1:-1] # unquote by removing apostrophes
+ value = sqlrepr(value, dbname)
+ token.normalized = token.value = value
+
+
if PY3:
xrange = range
last_stmt = statements[-1]
for i in xrange(len(last_stmt.tokens) - 1, 0, -1):
token = last_stmt.tokens[i]
- if token.ttype in (Comment.Single, Comment.Multiline,
- Newline, Whitespace):
+ if token.ttype in (T.Comment.Single, T.Comment.Multiline,
+ T.Newline, T.Whitespace):
continue
- if token.ttype is Punctuation and token.value == ';':
+ if token.ttype is T.Punctuation and token.value == ';':
break # The last statement is complete
# The last statement is still incomplete - wait for the next line
return
for stmt in self.statements:
yield stmt
self.statements = []
+ raise StopIteration
def close(self):
if not self.lines:
return
tokens = parse(''.join(self.lines), encoding=self.encoding)
for token in tokens:
- if (token.ttype not in (Comment.Single, Comment.Multiline,
- Newline, Whitespace)):
+ if (token.ttype not in (T.Comment.Single, T.Comment.Multiline,
+ T.Newline, T.Whitespace)):
raise ValueError("Incomplete SQL statement: %s" %
tokens)
return tokens