Merge pull request #1 from alexanderdidenko/patch-1

Update DatabaseWrapper class to operate similarly to the standard MySQL backend
This commit is contained in:
R. Tyler Croy 2011-09-07 17:27:34 -07:00
commit 8d02315985
1 changed files with 111 additions and 28 deletions

View File

@ -22,7 +22,6 @@ import mysql.connector.conversion
import re
from django.db.backends import *
from django.db.backends.mysql import base as mysqldb_base
from django.db.backends.mysql.client import DatabaseClient
from django.db.backends.mysql.creation import DatabaseCreation
from django.db.backends.mysql.introspection import DatabaseIntrospection
@ -31,12 +30,9 @@ from django.utils.safestring import SafeString, SafeUnicode
# Raise exceptions for database warnings if DEBUG is on
from django.conf import settings
# NOTE: Disabling this since `mysql.connector.Database.Warning` is a
# subclass of StandardError instead of a "warning"
if False: #settings.DEBUG:
from warnings import filterwarnings
filterwarnings("error", category=Database.Warning)
#if settings.DEBUG:
# from warnings import filterwarnings
# filterwarnings("error", category=Database.Warning)
DatabaseError = Database.DatabaseError
IntegrityError = Database.IntegrityError
@ -47,7 +43,7 @@ class DjangoMySQLConverter(Database.conversion.MySQLConverter):
"""
def _TIME_to_python(self, v, dsc=None):
return util.typecast_time(v)
def _decimal(self, v, desc=None):
return util.typecast_decimal(v)
"""
@ -65,11 +61,98 @@ server_version_re = re.compile(r'(\d{1,2})\.(\d{1,2})\.(\d{1,2})')
# TRADITIONAL will automatically cause most warnings to be treated as errors.
class DatabaseFeatures(BaseDatabaseFeatures):
allows_group_by_pk = True
autoindexes_primary_keys = False
inline_fk_references = False
related_fields_match_type = True
update_can_self_select = False
class DatabaseOperations(BaseDatabaseOperations):
def date_extract_sql(self, lookup_type, field_name):
# http://dev.mysql.com/doc/mysql/en/date-and-time-functions.html
return "EXTRACT(%s FROM %s)" % (lookup_type.upper(), field_name)
def date_trunc_sql(self, lookup_type, field_name):
fields = ['year', 'month', 'day', 'hour', 'minute', 'second']
format = ('%%Y-', '%%m', '-%%d', ' %%H:', '%%i', ':%%s') # Use double percents to escape.
format_def = ('0000-', '01', '-01', ' 00:', '00', ':00')
try:
i = fields.index(lookup_type) + 1
except ValueError:
sql = field_name
else:
format_str = ''.join([f for f in format[:i]] + [f for f in format_def[i:]])
sql = "CAST(DATE_FORMAT(%s, '%s') AS DATETIME)" % (field_name, format_str)
return sql
def drop_foreignkey_sql(self):
return "DROP FOREIGN KEY"
def fulltext_search_sql(self, field_name):
return 'MATCH (%s) AGAINST (%%s IN BOOLEAN MODE)' % field_name
def limit_offset_sql(self, limit, offset=None):
# 'LIMIT 20,40'
sql = "LIMIT "
if offset and offset != 0:
sql += "%s," % offset
return sql + str(limit)
def quote_name(self, name):
if name.startswith("`") and name.endswith("`"):
return name # Quoting once is enough.
return "`%s`" % name
def random_function_sql(self):
return 'RAND()'
def sql_flush(self, style, tables, sequences):
# NB: The generated SQL below is specific to MySQL
# 'TRUNCATE x;', 'TRUNCATE y;', 'TRUNCATE z;'... style SQL statements
# to clear all tables of all data
if tables:
sql = ['SET FOREIGN_KEY_CHECKS = 0;']
for table in tables:
sql.append('%s %s;' % (style.SQL_KEYWORD('TRUNCATE'), style.SQL_FIELD(self.quote_name(table))))
sql.append('SET FOREIGN_KEY_CHECKS = 1;')
# 'ALTER TABLE table AUTO_INCREMENT = 1;'... style SQL statements
# to reset sequence indices
sql.extend(["%s %s %s %s %s;" % \
(style.SQL_KEYWORD('ALTER'),
style.SQL_KEYWORD('TABLE'),
style.SQL_TABLE(self.quote_name(sequence['table'])),
style.SQL_KEYWORD('AUTO_INCREMENT'),
style.SQL_FIELD('= 1'),
) for sequence in sequences])
return sql
else:
return []
def value_to_db_datetime(self, value):
if value is None:
return None
# MySQL doesn't support tz-aware datetimes
if value.tzinfo is not None:
raise ValueError("MySQL backend does not support timezone-aware datetimes.")
# MySQL doesn't support microseconds
return unicode(value.replace(microsecond=0))
def value_to_db_time(self, value):
if value is None:
return None
# MySQL doesn't support tz-aware datetimes
if value.tzinfo is not None:
raise ValueError("MySQL backend does not support timezone-aware datetimes.")
# MySQL doesn't support microseconds
return unicode(value.replace(microsecond=0))
def year_lookup_bounds(self, value):
# Again, no microseconds
first = '%s-01-01 00:00:00'
second = '%s-12-31 23:59:59.99'
return [first % value, second % value]
class DatabaseWrapper(BaseDatabaseWrapper):
@ -93,14 +176,14 @@ class DatabaseWrapper(BaseDatabaseWrapper):
def __init__(self, *args, **kwargs):
super(DatabaseWrapper, self).__init__(*args, **kwargs)
self.server_version = None
self.features = DatabaseFeatures()
self.ops = mysqldb_base.DatabaseOperations()
self.server_version = None
self.features = DatabaseFeatures(self)
self.ops = DatabaseOperations()
self.client = DatabaseClient(self)
self.creation = DatabaseCreation(self)
self.introspection = DatabaseIntrospection(self)
self.validation = DatabaseValidation()
self.validation = DatabaseValidation(self)
def _valid_connection(self):
if self.connection is not None:
@ -120,19 +203,19 @@ class DatabaseWrapper(BaseDatabaseWrapper):
'use_unicode': True,
}
settings_dict = self.settings_dict
if settings_dict['DATABASE_USER']:
kwargs['user'] = settings_dict['DATABASE_USER']
if settings_dict['DATABASE_NAME']:
kwargs['db'] = settings_dict['DATABASE_NAME']
if settings_dict['DATABASE_PASSWORD']:
kwargs['password'] = settings_dict['DATABASE_PASSWORD']
if settings_dict['DATABASE_HOST'].startswith('/'):
kwargs['unix_socket'] = settings_dict['DATABASE_HOST']
elif settings_dict['DATABASE_HOST']:
kwargs['host'] = settings_dict['DATABASE_HOST']
if settings_dict['DATABASE_PORT']:
kwargs['port'] = int(settings_dict['DATABASE_PORT'])
kwargs.update(settings_dict['DATABASE_OPTIONS'])
if settings_dict['USER']:
kwargs['user'] = settings_dict['USER']
if settings_dict['NAME']:
kwargs['db'] = settings_dict['NAME']
if settings_dict['PASSWORD']:
kwargs['passwd'] = settings_dict['PASSWORD']
if settings_dict['HOST'].startswith('/'):
kwargs['unix_socket'] = settings_dict['HOST']
elif settings_dict['HOST']:
kwargs['host'] = settings_dict['HOST']
if settings_dict['PORT']:
kwargs['port'] = int(settings_dict['PORT'])
kwargs.update(settings_dict['OPTIONS'])
self.connection = Database.connect(**kwargs)
self.connection.set_converter_class(DjangoMySQLConverter)
cursor = self.connection.cursor()