前面几个章节介绍了怎样搭建flask服务器,介绍了flask中如何实现HTTP的4种操作方法,但是前一章中是使用内存来保存数据,这样数据实际没有保存下来,而且也只能在单进程中使用,实际使用中我们需要数据库来为我们完成数据的相关操作,这一章将介绍如何在flask中使用数据库,以mysql为例;
flask有一个对数据库的扩展flask-sqlalchemy,它简化了在flask中对sqlalchemy的操作,sqlalchemy是一个强大的关系数据库框架,支持一些数据库后端,提供高级的ORM和底层访问数据库的本地sql功能;
使用pip来安装flask-sqlalchemy
1 |
pip install flask-sqlalchemy |
在flask-sqlalchemy中,sql被指定为URL,访问mysql的url如下:
1 |
mysql://username:[email protected]/database |
在URL中hostname是指托管mysql服务的服务器,可以使localhost也可以是远端服务器,数据库服务器可以托管多个数据库,所以database要指定连接的数据库名,username和password是数据库的身份验证;
应用程序数据库URL必须在flask配置对象中的SQLALCHEMY_DATABASE_URI中进行配置,另一个有用的选项是SQLALCHEMY_COMMIT_ON_TEARDOWN,可以设置为True来启动自动提交数据库更改在每个请求中:
1 2 3 4 5 6 7 8 9 10 11 |
from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy import config databaseurl = 'mysql://%s:%s@%s:%s/%s' % (config.MYSQL_USER, config.MYSQL_PASS, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DB) app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = databaseurl app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True db = SQLAlchemy(app) |
由SQLAlchemy实例化的db对象表示数据库且提供访问flask-sqlalchemy的所有功能;
flask_sqlalchemy使用模型来定义数据库表,一个模型通常是一个带有属性的python类,其属性与数据库表的列相匹配对应,flask-sqlalchemy数据库实例提供了一个基类以及一组辅助类和函数用于定义它的结构,如下定义了一个student模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class student(db.Model): __tablename__ = 'student' id = db.Column(db.Integer, primary_key=True, nullable=False) name = db.Column(db.String(20), nullable=False) age = db.Column(db.Integer, nullable=False) def __init__(self, id, name, age): self.id = id self.name = name self.age = age def __repr__(self): return '' % (self.id, self.name) |
__tablename__类变量定义数据库中表的名称,如果缺省,flask_sqlalchemy会指定默认表名,其余的变量是模型的属性,定义为db.Column类的实例;
传给db.Column构造函数的第一个参数是数据库列的类型也就是模型属性的数据类型:
类型名称 | python类型 | 描述 |
Integer | int | 常规整型,通常为32位 |
SmallInteger | int | 短整型,通常为16位 |
BigInteger | int或long | 精度不受限整型 |
Float | float | 浮点型 |
Numeric | decimal | 定点数 |
String | str | 可变长度字符串 |
Text | str | 可变长度字符串,适合大量文本 |
Unicode | unicode | 可变长度Unicode字符串 |
Boolean | bool | 布尔值 |
Date | datetime.date | 日期类型 |
Time | datetime.time | 时间类型 |
Daetime | datetime.datetime | 日期时间类型 |
Interval | datetime.timedate | 时间间隔 |
Enum | str | 字符列表 |
PickleType | 任意Python对象 | 自动Pickle序列化 |
LargeBinary | str | 二进制 |
其它的参数为每个属性指定了配置选项:
可选参数 | 描述 |
primary_key | 设置为True,该列为表的主键 |
unique | 设置为True,该列不允许相同的值 |
index | 设置为True,为该列创建索引 |
nullable | 设置为True,允许为空 |
default | 定义该列的默认值 |
一个简单的例子,使用POST插入学生信息,使用GET来获取信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
from flask import Flask from flask import request from flask import jsonify from flask.ext.sqlalchemy import SQLAlchemy import config databaseurl = 'mysql://%s:%s@%s:%s/%s' % (config.MYSQL_USER, config.MYSQL_PASS, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DB) app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = databaseurl app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True db = SQLAlchemy(app) class mytable(db.Model): id = db.Column(db.Integer, primary_key=True, nullable=False) name = db.Column(db.String(20), nullable=False) age = db.Column(db.Integer, nullable=False) def __init__(self, id, name, age): self.id = id self.name = name self.age = age def __repr__(self): return '' % (self.id, self.name) db.create_all() @app.route('/', methods=['POST']) def hello(): if not request.json: return "failed!", 400 student = { 'id': request.json['id'], 'name': request.json['name'], 'age': request.json['age'] } #初始化student对象 stu = mytable(int(student['id']), student['name'], int(student['age'])) #将新增项目插入数据库 db.session.add(stu) #提交修改 db.session.commit() return "Hello World!" @app.route('/', methods=['GET']) def get_one(): if not request.args['id']: abort(400) get_id = request.args['id'] #得到表中所有的数据 ids = mytable.query.all() #使用filter找到指定项目 get = mytable.query.filter_by(id = get_id).first() #获取表成员属性 ret = 'id=%d,name=%s,age=%d' % (get.id, get.name, get.age) return ret app.run(debug = True) |
使用post上传信息后,数据或同步保存到数据库中,程序重启后依然能够从数据库中获取历史信息;