X-Git-Url: https://git.phdru.name/?a=blobdiff_plain;f=m_librarian%2Fdb.py;h=9a48caa8fdade56f6ca38f86f393ad410ef98b5d;hb=1b4e677c375c8d518574c84406428f1ccbcc6022;hp=7ae3ed30ae722812be7ab9db8abaf804fc9e8550;hpb=af6ee17b251b1ae074e1eca5d80c7a599ee88536;p=m_librarian.git diff --git a/m_librarian/db.py b/m_librarian/db.py index 7ae3ed3..9a48caa 100755 --- a/m_librarian/db.py +++ b/m_librarian/db.py @@ -1,82 +1,15 @@ #! /usr/bin/env python -__all__ = ['Author', 'Book', 'Extension', 'Genre', 'Language', - 'AuthorBook', 'BookGenre', 'open_db', 'init_db', - 'insert_name', 'insert_author', 'update_counters', - ] - import os from sqlobject import SQLObject, StringCol, UnicodeCol, IntCol, BoolCol, \ ForeignKey, DateCol, DatabaseIndex, RelatedJoin, \ connectionForURI, sqlhub, SQLObjectNotFound, dberrors from .config import get_config - -def _find_sqlite_db_dirs_posix(): - db_dirs = [] - if 'XDG_CACHE_HOME' in os.environ: - db_dirs.append(os.environ['XDG_CACHE_HOME']) - home_cache = os.path.expanduser('~/.cache') - if home_cache not in db_dirs: - db_dirs.append(home_cache) - return db_dirs - - -def find_sqlite_db_dirs(): - if os.name == 'posix': - return _find_sqlite_db_dirs_posix() - raise OSError("Unknow OS") - - -def find_sqlite_dburi(db_dirs=None): - if db_dirs is None: - db_dirs = find_sqlite_db_dirs() - for d in db_dirs: - db_file = os.path.join(d, 'm_librarian.sqlite') - if os.path.exists(db_file): - break - else: - # octal; -rw-------; - # make the database file/directory readable/writeable only by the user - os.umask(0066) - db_dir = db_dirs[0] - try: - os.makedirs(db_dir) - except OSError: # Perhaps already exists - pass - db_file = os.path.join(db_dir, 'm_librarian.sqlite') - - return 'sqlite://%s' % db_file.replace(os.sep, '/') - - -def open_db(db_uri=None): - if db_uri is None: - try: - db_uri = get_config().get('database', 'URI') - except: - db_uri = find_sqlite_dburi() - - sqlhub.processConnection = connection = connectionForURI(db_uri) - - if connection.dbName == 'sqlite': - def lower(s): - return s.lower() - - sqlite = connection.module - - class MLConnection(sqlite.Connection): - def __init__(self, *args, **kwargs): - super(MLConnection, self).__init__(*args, **kwargs) - self.create_function('lower', 1, lower) - - # This hack must be done at the very beginning, before the first query - connection._connOptions['factory'] = MLConnection - - # Speedup SQLite connection - connection.query("PRAGMA synchronous=OFF") - connection.query("PRAGMA count_changes=OFF") - connection.query("PRAGMA journal_mode=MEMORY") - connection.query("PRAGMA temp_store=MEMORY") +__all__ = ['Author', 'Book', 'Extension', 'Genre', 'Language', + 'AuthorBook', 'BookGenre', 'open_db', 'init_db', + 'insert_name', 'insert_author', 'update_counters', + ] class Author(SQLObject): @@ -143,13 +76,13 @@ class BookGenre(SQLObject): class Extension(SQLObject): - name = StringCol(notNull=True, unique=True) + name = UnicodeCol(notNull=True, unique=True) count = IntCol(notNull=True) count_idx = DatabaseIndex(count) class Genre(SQLObject): - name = StringCol(notNull=True, unique=True) + name = UnicodeCol(notNull=True, unique=True) title = UnicodeCol(notNull=True) count = IntCol(notNull=True) books = RelatedJoin('Book', otherColumn='book_id', @@ -160,11 +93,88 @@ class Genre(SQLObject): class Language(SQLObject): - name = StringCol(notNull=True, unique=True) + name = UnicodeCol(notNull=True, unique=True) count = IntCol(notNull=True) count_idx = DatabaseIndex(count) +def _find_sqlite_db_dirs_posix(): + db_dirs = [] + if 'XDG_CACHE_HOME' in os.environ: + db_dirs.append(os.environ['XDG_CACHE_HOME']) + home_cache = os.path.expanduser('~/.cache') + if home_cache not in db_dirs: + db_dirs.append(home_cache) + return db_dirs + + +def find_sqlite_db_dirs(): + if os.name == 'posix': + return _find_sqlite_db_dirs_posix() + raise OSError("Unknow OS") + + +def find_sqlite_dburi(db_dirs=None): + if db_dirs is None: + db_dirs = find_sqlite_db_dirs() + for d in db_dirs: + db_file = os.path.join(d, 'm_librarian.sqlite') + if os.path.exists(db_file): + break + else: + # octal; -rw-------; + # make the database file/directory readable/writeable only by the user + os.umask(0066) + db_dir = db_dirs[0] + try: + os.makedirs(db_dir) + except OSError: # Perhaps already exists + pass + db_file = os.path.join(db_dir, 'm_librarian.sqlite') + + return 'sqlite://%s' % db_file.replace(os.sep, '/') + + +def open_db(db_uri=None): + if db_uri is None: + try: + db_uri = get_config().get('database', 'URI') + except: + db_uri = find_sqlite_dburi() + + if db_uri.startswith(os.sep) or db_uri.startswith(os.altsep) \ + or db_uri.startswith(os.pardir + os.sep) \ + or db_uri.startswith(os.pardir + os.altsep): + if db_uri.startswith(os.pardir + os.sep) \ + or db_uri.startswith(os.pardir + os.altsep): + db_uri = os.path.abspath(db_uri) + db_uri = 'sqlite://' + db_uri.replace(os.sep, '/') + + sqlhub.processConnection = connection = connectionForURI(db_uri) + + if connection.dbName == 'sqlite': + def lower(s): + if isinstance(s, basestring): + return s.lower() + return s + + sqlite = connection.module + + class MLConnection(sqlite.Connection): + def __init__(self, *args, **kwargs): + super(MLConnection, self).__init__(*args, **kwargs) + self.create_function('lower', 1, lower) + + # This hack must be done at the very beginning, before the first query + connection._connOptions['factory'] = MLConnection + + # Speedup SQLite connection + connection.query("PRAGMA synchronous=OFF") + connection.query("PRAGMA count_changes=OFF") + connection.query("PRAGMA journal_mode=MEMORY") + connection.query("PRAGMA temp_store=MEMORY") + + def init_db(): try: Book.select()[0] @@ -198,13 +208,13 @@ def update_counters(): author.count = AuthorBook.select(AuthorBook.q.author == author).count() for ext in Extension.select(): - ext.count = Book.select(Book.q.extension == ext.name).count() + ext.count = Book.select(Book.q.extension == ext.id).count() for genre in Genre.select(): genre.count = BookGenre.select(BookGenre.q.genre == genre).count() for language in Language.select(): - language.count = Book.select(Book.q.language == language.name).count() + language.count = Book.select(Book.q.language == language.id).count() def test():