Flask学习笔记(二)
2024-06-17 / 刘辉

通过SQLAlchemy来操作数据库,SQLAlchemy 是一个强大的 Python SQL 工具包和对象关系映射 (ORM) 系统,它允许开发者以面向对象的方式与数据库进行交互。

安装flask-sqlalchemy

pip install flask-sqlalchemy

1
2
3
4
5
6
7
8
9
10
11
12
13
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 替换为你的密钥
# 数据库连接字符串,格式为:数据库类型://用户名:密码@主机名:端口/数据库名
# 例如:mysql://root:password@localhost:3306/mydatabase
# 这里使用的是sqlite数据库,使用mysql需安装pymysql库
# app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:password@localhost:3306/mydatabase'
app.config['SQLALCHEMY_DATABASE_URI']= 'sqlite:///mydatabase.db'
#是否追踪数据库修改(开启后会触发一些钩子函数) 一般不开启, 会影响性能
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 是否打印SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)

到这里初始化配置完成,’sqlite:///mydatabase.db’这个地址会默认在项目根目录下创建一个instance目录,里面会生成一个mydatabase.db文件,这个文件就是数据库文件。
现在我们定义模型,也就是定义数据库的表结构,这里我们定义一个User表和UserInfo表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class UserInfo(db.Model): 
__tablename__ = 'userinfo'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)#自增主键
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email
}
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True,autoincrement=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'password': self.password
}

一个类是一个表,类中的属性就是表的字段,类的实例就是一条记录。
在定义完模型之后,我们需要创建数据库,可以通过db.create_all()方法来创建数据库,这个方法会根据模型定义来创建数据库,如果数据库已经存在,则会创建表,如果表已经存在,则会创建字段,如果字段已经存在,则会创建索引。
建表初始化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
with app.app_context():#创建连接
db.drop_all() #删除表
db.create_all()#创建表
users_list = {
"admin": "admin123",
"user1": "123456",
"user2": "123456"
}
users_info = [{ 'username': 'admin', 'email': 'admin@example.com'},
{ 'username': 'user1', 'email': 'user1@example.com'},
{'username': 'user2', 'email': 'user2@example.com'}]
for k,v in users_list.items():
user = User(username=k, password=v)
db.session.add(user)
db.session.commit()
for i in range(len(users_info)):
user_info=UserInfo(username=users_info[i]['username'], email=users_info[i]['email'])
db.session.add(user_info)
db.session.commit()

查询语法:

1
2
3
4
5
6
7
8
9
10
#1.查询所有记录:
users = User.query.all()
for user in users:
print(user.to_dict())
#2.查询单个记录:
user = User.query.fiter_by(username='admin').first()
print(user.to_dict())
#3.查询记录总数:
count = User.query.count()
print(count)

删除与更新语法:

1
2
3
4
5
6
7
user = User.query.filter_by(username='admin').first()
db.session.delete(user)
db.session.commit()
更新语法:
user = User.query.filter_by(username='admin').first()
user.password = 'admin1234'
db.session.commit()

注意:
db.session.add(user)添加记录到数据库,db.session.delete(user)删除记录,db.session.commit()提交到数据库。
做了操作后,需要提交到数据库,否则不会生效。

更新后Flask-demo的完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import os
from datetime import datetime, timedelta
from functools import wraps

import jwt
from flask import Flask
from flask import jsonify
import logging
from flask_sqlalchemy import SQLAlchemy


logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

app = Flask(__name__)
# 替换为你的密钥
# 数据库连接字符串,格式为:数据库类型://用户名:密码@主机名:端口/数据库名
# 例如:mysql://root:password@localhost:3306/mydatabase
# 这里使用的是sqlite数据库,使用mysql需安装pymysql库
# app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:password@localhost:3306/mydatabase'
app.config['SQLALCHEMY_DATABASE_URI']= 'sqlite:///mydatabase.db'
#是否追踪数据库修改(开启后会触发一些钩子函数) 一般不开启, 会影响性能
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 是否打印SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
class UserInfo(db.Model):
__tablename__ = 'userinfo'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)#自增主键
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email
}
class User(db.Model):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True,autoincrement=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'password': self.password
}
"""
建表初始化数据

with app.app_context():#创建连接
db.drop_all() #删除表
db.create_all()#创建表
users_list = {
"admin": "admin123",
"user1": "123456",
"user2": "123456"
}
users_info = [{ 'username': 'admin', 'email': 'admin@example.com'},
{ 'username': 'user1', 'email': 'user1@example.com'},
{'username': 'user2', 'email': 'user2@example.com'}]
for k,v in users_list.items():
user = User(username=k, password=v)
db.session.add(user)
db.session.commit()
for i in range(len(users_info)):
user_info=UserInfo(username=users_info[i]['username'], email=users_info[i]['email'])
db.session.add(user_info)
db.session.commit()

"""




# 假设有一个用户数据库,这里简化为字典
# users_list = {
# "admin": "admin123",
# 'user1': '123456',
# 'user2': '123456',
# # ...
# }
# users_info = {
# "admin": {'id': 0, 'username': 'admin', 'email': 'admin@example.com'},
# 'user1': {'id': 1, 'username': 'user1', 'email': 'user1@example.com'},
# 'user2': {'id': 2, 'username': 'user2', 'email': 'user2@example.com'},
# }

from flask import request, send_from_directory
from werkzeug.utils import secure_filename

path=os.getcwd()

app.config['UPLOAD_FOLDER'] = path # 替换为你的上传目录
app.config['ALLOWED_EXTENSIONS'] = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif','xlsx','docx'} # 允许的文件扩展名

def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']

def authenticate(username, password):
"""
验证用户名和密码
:param username: 用户名
:param password: 密码
:return: 用户名或None
"""
user = User.query.filter_by(username=username).first()
# if username in users_list and users_list[username] == password:
# return username
if user and user.password == password:
return user.username
return None

SECRET_KEY = 'your_secret_key'
def create_token(identity):
payload = {
'sub': identity,
'exp': datetime.utcnow() + timedelta(minutes=30)
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')


def get_user_from_db(username):
user = User.query.filter_by(username=username).first()
return user.username


def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Token is missing'}), 401

try:
token = token.split(" ")[1]
data = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
current_user = data['sub']
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token 已过期'}), 401
except jwt.InvalidTokenError:
return jsonify({'message': '无效 token'}), 401

return f(current_user, *args, **kwargs)

return decorated


@app.route('/register', methods=['POST'])
@token_required
def register(*args, **kwargs):
data = request.get_json()
try:
username = data.get('username')
password = data.get('password')
email = data.get('email')
except:
return jsonify("参数错误"), 405
if User.query.filter_by(username=username).first():
return jsonify("用户名已存在"), 400
else:
user = User(username=username, password=password)
users_info=UserInfo(username=username, email=email)
db.session.add(user)
db.session.add(users_info)
# users_list[username] = password
# users_info[username] = {'id': len(users_list) + 1, 'username': username, 'email': email}
return jsonify({"message": "创建成功"})

@app.route('/file/upload', methods=['POST'])
@token_required
def upload_file(*args, **kwargs):
if 'file' not in request.files:
return "No file part", 400
file = request.files['file']
if file.filename == '':
return "No selected file", 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return jsonify({
"message":"上传成功",
"data":{
"上传地址":os.path.join(app.config['UPLOAD_FOLDER'], filename)
}
})
else:
return "Invalid file type", 400

@app.route('/file/download/<filename>', methods=['GET'])
@token_required
def download_file(current_user,filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)



@app.route('/login', methods=['POST', 'GET'])
def login():
if request.method == "GET":
username = request.args.get('username')
password = request.args.get('password')
if request.method == "POST":
data = request.get_json()
username = data.get('username')
password = data.get('password')
authenticated_user = authenticate(username, password)
if authenticated_user:
token = create_token(authenticated_user)
return jsonify({'token': token,
'message':'登录成功',
'code':1
})
else:
return jsonify({'error': '用户名或密码错误'}), 401


@app.route('/api/user', methods=['GET'])
@token_required
def get_user(current_user):
# 这里假设你有一个函数可以从数据库中获取用户信息,比如通过用户名
user_data = get_user_from_db(current_user)
if user_data:
return jsonify({
'username': user_data,
'message': '获取用户信息成功'
}), 200
else:
return jsonify({'message': 'User not found'}), 404

@app.route('/api/user/list', methods=['GET'])
@token_required
def get_user_list_info(current_user):
users_info = UserInfo.query.all()
users_info = [user.to_dict() for user in users_info]
return jsonify({"message":"请求成功",
"data":users_info})

@app.route('/abc', methods=['GET', 'POST'])
def index1():
users_info = UserInfo.query.all()
return jsonify({"message": "请求成功",
"data": users_info})
@app.route('/api/upload', methods=['PUT'])
@token_required
def upload(current_user):
try:
password = request.form.get('password')
if password == None:
return jsonify({"message": "参数或格式错误"}), 405
email = request.form.get('email')
except:
return jsonify({"message": "参数或格式错误"}), 405
try:
user = User.query.filter_by(username=current_user).first()
# users_list[current_user] = password
# users_info[current_user] = {'id': users_info[current_user]['id'], 'username': current_user, 'email': email}
user.password = password
users_info=UserInfo.query.filter_by(username=current_user).first()
users_info.email = email
db.session.commit()
except:
return jsonify({"message": "更新失败,程序错误"}), 500
return jsonify({"message": "更新成功",
"data": users_info.to_dict()})

@app.route('/api/delete/<user_id>', methods=['DELETE'])
@token_required
def delete(current_user,user_id):
try:
user_all=User.query.all()
[print(i.to_dict()) for i in user_all]

user = User.query.filter_by(id=user_id).first()
db.session.delete(user)
# for k,v in users_info.items():
# if str(v['id'])==user_id:
# users_info.pop(k)
db.session.commit()
return jsonify({"message":"删除成功",
"data":user.to_dict()})
except:
return jsonify({"message": "程序错误"}),500




if __name__ == '__main__':

api_name = {
"登录接口": '/login',
"获取当前用户信息": "/api/user",
"注册用户": "/register",
"修改用户信息":"/api/upload",
"删除用户":"/api/delete/<user_id>",
"文件上传":"/file/upload",
"文件下载": "/file/download/<filename>",
"获取所有用户信息":"/api/user/list"
}
api_method = {
"登录接口": 'POST、GET',
"获取当前用户信息": "GET",
"注册用户": "POST",
"修改用户信息": "PUT(form)",
"删除用户":"DELETE(path)",
"文件上传": "POST(file)",
"文件下载": "GET",
"获取所有用户信息": "GET"
}
api_key = {
"登录接口": 'username、password',
"获取当前用户信息": "无",
"注册用户": "username、password、email",
"修改用户信息":"password、email",
"删除用户":"无",
"文件上传": "file",
"文件下载": "filename",
"获取所有用户信息": "无"
}
print("接口列表:")
[print(f"{k}===>{v}") for k,v in api_name.items()]
print("="*50)

print("请求方式:")
[print(f"{k}===>{v}") for k, v in api_method.items()]
print("=" * 50)

print("请求体内容:")
[print(f"{k}===>{v}") for k, v in api_key.items()]
print("=" * 50)

print("管理员账号","admin/admin123")
input("输入任意键开始")
app.run(debug=True)

Flask的学习后面有机会再继续,暂时先到这里。

本文链接:https://xiamu9527.cn/2024/06/17/Flask%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%BA%8C%EF%BC%89/index.html