我想在现有的映射类中添加一个字段,如何自动更新sql表。如果将字段添加到类中,sqlalchemy是否提供了使用新列更新数据库的方法。
我想在现有的映射类中添加一个字段,如何自动更新sql表。如果将字段添加到类中,sqlalchemy是否提供了使用新列更新数据库的方法。
SQLAlchemy本身不支持模式的自动更新,但有第三方 SQLAlchemy Migrate 用于自动迁移的工具。但是看 “数据库模式版本控制工作流程”一章 看看它是如何工作的。
SQLAlchemy本身不支持模式的自动更新,但有第三方 SQLAlchemy Migrate 用于自动迁移的工具。但是看 “数据库模式版本控制工作流程”一章 看看它是如何工作的。
有时迁移工作太多 - 您只想在运行更改的代码时自动添加列。所以这是一个功能。
注意事项:它在SQLAlchemy内部发现并且每次SQLAlchemy经历重大修订时都需要进行小的更改。 (这可能是一个更好的方法 - 我不是SQLAlchemy专家)。它也不处理约束。
import logging
import re
import sqlalchemy
from sqlalchemy import MetaData, Table, exceptions
import sqlalchemy.engine.ddl
_new_sa_ddl = sqlalchemy.__version__.startswith('0.7')
def create_and_upgrade(engine, metadata):
"""For each table in metadata, if it is not in the database then create it.
If it is in the database then add any missing columns and warn about any columns
whose spec has changed"""
db_metadata = MetaData()
db_metadata.bind = engine
for model_table in metadata.sorted_tables:
try:
db_table = Table(model_table.name, db_metadata, autoload=True)
except exceptions.NoSuchTableError:
logging.info('Creating table %s' % model_table.name)
model_table.create(bind=engine)
else:
if _new_sa_ddl:
ddl_c = engine.dialect.ddl_compiler(engine.dialect, None)
else:
# 0.6
ddl_c = engine.dialect.ddl_compiler(engine.dialect, db_table)
# else:
# 0.5
# ddl_c = engine.dialect.schemagenerator(engine.dialect, engine.contextual_connect())
logging.debug('Table %s already exists. Checking for missing columns' % model_table.name)
model_columns = _column_names(model_table)
db_columns = _column_names(db_table)
to_create = model_columns - db_columns
to_remove = db_columns - model_columns
to_check = db_columns.intersection(model_columns)
for c in to_create:
model_column = getattr(model_table.c, c)
logging.info('Adding column %s.%s' % (model_table.name, model_column.name))
assert not model_column.constraints, \
'Arrrgh! I cannot automatically add columns with constraints to the database'\
'Please consider fixing me if you care!'
model_col_spec = ddl_c.get_column_specification(model_column)
sql = 'ALTER TABLE %s ADD %s' % (model_table.name, model_col_spec)
engine.execute(sql)
# It's difficult to reliably determine if the model has changed
# a column definition. E.g. the default precision of columns
# is None, which means the database decides. Therefore when I look at the model
# it may give the SQL for the column as INTEGER but when I look at the database
# I have a definite precision, therefore the returned type is INTEGER(11)
for c in to_check:
model_column = model_table.c[c]
db_column = db_table.c[c]
x = model_column == db_column
logging.debug('Checking column %s.%s' % (model_table.name, model_column.name))
model_col_spec = ddl_c.get_column_specification(model_column)
db_col_spec = ddl_c.get_column_specification(db_column)
model_col_spec = re.sub('[(][\d ,]+[)]', '', model_col_spec)
db_col_spec = re.sub('[(][\d ,]+[)]', '', db_col_spec)
db_col_spec = db_col_spec.replace('DECIMAL', 'NUMERIC')
db_col_spec = db_col_spec.replace('TINYINT', 'BOOL')
if model_col_spec != db_col_spec:
logging.warning('Column %s.%s has specification %r in the model but %r in the database' %
(model_table.name, model_column.name, model_col_spec, db_col_spec))
if model_column.constraints or db_column.constraints:
# TODO, check constraints
logging.debug('Column constraints not checked. I am too dumb')
for c in to_remove:
model_column = getattr(db_table.c, c)
logging.warning('Column %s.%s in the database is not in the model' % (model_table.name, model_column.name))
def _column_names(table):
# Autoloaded columns return unicode column names - make sure we treat all are equal
return set((unicode(i.name) for i in table.c))
蒸馏器 是提供数据库迁移的最新软件包。
# database.py has definition for engine.
# from sqlalchemy import create_engine
# engine = create_engine('mysql://......', convert_unicode=True)
from database import engine
from sqlalchemy import DDL
add_column = DDL('ALTER TABLE USERS ADD COLUMN city VARCHAR(60) AFTER email')
engine.execute(add_column)