from
flask
import
Flask, render_template, request, redirect, url_for, session, flash, abortfrom flask_mail
import
Mail, Message
from
extensions
import
db
from
models
import
Room, Booking
from
config
import
Config
from
datetime
import
datetime, timedelta
import
re
import
uuid
app
=
Flask(__name__)
app.config.from_object(Config)
db.init_app(app)
mail
=
Mail(app)
@app
.context_processor
def
inject_global_vars():
def
generate_time_slots():
return
[f
"{h:02d}:{m}"
for
h
in
range
(
8
,
21
)
for
m
in
(
'00'
,
'30'
)]
min_date
=
(datetime.now()
+
timedelta(hours
=
1
)).strftime(
'%Y-%m-%d'
)
return
dict
(time_slots
=
generate_time_slots(), min_date
=
min_date)
@app
.errorhandler(
404
)
def
page_not_found(e):
return
render_template(
'404.html'
),
404
@app
.route(
'/'
)
def
index():
rooms
=
Room.query.filter_by(is_active
=
True
).
all
()
min_date
=
(datetime.now()
+
timedelta(hours
=
1
)).strftime(
'%Y-%m-%d'
)
return
render_template(
'index.html'
, rooms
=
rooms, min_date
=
min_date)
@app
.route(
'/book'
, methods
=
[
'POST'
])
def
book_room():
try
:
booking_date
=
request.form[
'booking_date'
]
start_time_str
=
f
"{booking_date} {request.form['start_time']}"
end_time_str
=
f
"{booking_date} {request.form['end_time']}"
applicant_name
=
request.form.get(
'applicant_name'
, '').strip()
company
=
request.form.get(
'company'
, '').strip()
phone
=
request.form.get(
'phone'
, '').strip()
purpose
=
request.form.get(
'purpose'
, '').strip()
start_time
=
datetime.strptime(start_time_str,
'%Y-%m-%d %H:%M'
)
end_time
=
datetime.strptime(end_time_str,
'%Y-%m-%d %H:%M'
)
if
len
(applicant_name) <
2
or
len
(applicant_name) >
50
:
flash(
"姓名需为2-50个字符"
,
"danger"
)
return
redirect(url_for(
'index'
))
if
len
(company) <
2
or
len
(company) >
100
:
flash(
"公司名称需为2-100个字符"
,
"danger"
)
return
redirect(url_for(
'index'
))
if
not
re.match(r
'^1[3-9]\d{9}$'
, phone):
flash(
"请输入有效的中国大陆手机号码"
,
"danger"
)
return
redirect(url_for(
'index'
))
now
=
datetime.now()
if
start_time < now
+
timedelta(hours
=
1
):
flash(
"请至少提前1小时预约"
,
"danger"
)
return
redirect(url_for(
'index'
))
if
end_time <
=
start_time:
flash(
"结束时间必须晚于开始时间"
,
"danger"
)
return
redirect(url_for(
'index'
))
conflicting_booking
=
Booking.query.
filter
(
Booking.room_id
=
=
request.form[
'room_id'
],
Booking.start_time < end_time,
Booking.end_time > start_time,
Booking.status
=
=
'已批准'
).first()
if
conflicting_booking:
flash(
"该时间段已被预约"
,
"danger"
)
return
redirect(url_for(
'index'
))
booking
=
Booking(
email
=
request.form[
'email'
],
room_id
=
request.form[
'room_id'
],
start_time
=
start_time,
end_time
=
end_time,
cancel_code
=
str
(uuid.uuid4()),
applicant_name
=
applicant_name,
company
=
company,
phone
=
phone,
purpose
=
purpose
)
if
not
booking.validate_phone():
flash(
"手机号码格式错误"
,
"danger"
)
return
redirect(url_for(
'index'
))
db.session.add(booking)
db.session.commit()
send_confirmation_email(booking)
flash(
"预约申请已提交,请查收邮件确认"
,
"success"
)
return
redirect(url_for(
'index'
, booking_success
=
'true'
))
except
Exception as e:
db.session.rollback()
app.logger.error(f
"预约错误: {str(e)}"
)
flash(
"预约失败,请检查输入信息"
,
"danger"
)
return
redirect(url_for(
'index'
))
def
send_confirmation_email(booking):
try
:
msg
=
Message(
"会议室预约确认"
,
sender
=
app.config[
'MAIL_USERNAME'
],
recipients
=
[booking.email])
msg.body
=
render_template(
'emails/confirmation.txt'
,
booking
=
booking,
applicant_name
=
booking.applicant_name,
company
=
booking.company,
phone
=
booking.phone,
purpose
=
booking.purpose,
cancel_url
=
url_for(
'cancel_booking'
, code
=
booking.cancel_code, _external
=
True
)
)
mail.send(msg)
except
Exception as e:
app.logger.error(f
"邮件发送失败: {str(e)}"
)
def
send_approval_email(booking):
try
:
msg
=
Message(
"会议室预约已通过"
,
sender
=
app.config[
'MAIL_USERNAME'
],
recipients
=
[booking.email]
)
msg.html
=
render_template(
'emails/approval.html'
,
booking
=
booking,
contact_email
=
app.config[
'MAIL_USERNAME'
]
)
mail.send(msg)
app.logger.info(f
"已发送审批通过邮件至 {booking.email}"
)
except
Exception as e:
app.logger.error(f
"审批通过邮件发送失败: {str(e)}"
)
@app
.route(
'/admin/bookings/approve/<int:id>'
)
def
approve_booking_reason(
id
):
if
not
session.get(
'admin_logged_in'
):
abort(
403
)
try
:
booking
=
Booking.query.get_or_404(
id
)
booking.status
=
'已批准'
db.session.commit()
send_approval_email(booking)
flash(
"预约已批准并发送确认邮件"
,
"success"
)
except
Exception as e:
db.session.rollback()
app.logger.error(f
"审批操作失败: {str(e)}"
)
flash(
"操作失败,请检查日志"
,
"danger"
)
return
redirect(url_for(
'manage_bookings'
))
def
send_rejection_email(booking, reason
=
None
):
try
:
msg
=
Message(
"会议室预约未通过审批"
,
sender
=
app.config[
'MAIL_USERNAME'
],
recipients
=
[booking.email])
msg.html
=
render_template(
'emails/rejection.html'
,
booking
=
booking,
reason
=
reason,
contact_email
=
app.config[
'MAIL_USERNAME'
]
)
mail.send(msg)
app.logger.info(f
"已发送拒绝通知至 {booking.email}"
)
except
Exception as e:
app.logger.error(f
"拒绝邮件发送失败: {str(e)}"
)
@app
.route(
'/admin/bookings/reject/<int:id>'
, methods
=
[
'GET'
,
'POST'
])
def
reject_booking_with_reason(
id
):
if
not
session.get(
'admin_logged_in'
):
abort(
403
)
booking
=
Booking.query.get_or_404(
id
)
if
request.method
=
=
'POST'
:
rejection_reason
=
request.form.get(
'reason'
,
'不符合会议室使用规定'
)
booking.status
=
'已拒绝'
db.session.commit()
send_rejection_email(booking, rejection_reason)
flash(
"已拒绝该预约并通知申请人"
,
"success"
)
return
redirect(url_for(
'manage_bookings'
))
return
render_template(
'admin/reject_reason.html'
, booking
=
booking)
@app
.route(
'/cancel/<string:code>'
)
def
cancel_booking(code):
booking
=
Booking.query.filter_by(cancel_code
=
code).first()
if
booking:
room_name
=
booking.room.name
time_str
=
booking.start_time.strftime(
'%Y-%m-%d %H:%M'
)
db.session.delete(booking)
db.session.commit()
try
:
msg
=
Message(
"预约取消确认"
,
sender
=
app.config[
'MAIL_USERNAME'
],
recipients
=
[booking.email])
msg.body
=
f
mail.send(msg)
except
Exception as e:
app.logger.error(f
"取消确认邮件发送失败: {str(e)}"
)
return
render_template(
'bookings/cancel.html'
, booking
=
booking)
return
render_template(
'bookings/cancel.html'
, booking
=
None
),
404
@app
.route(
'/admin'
)
def
admin_dashboard():
if
not
session.get(
'admin_logged_in'
):
return
redirect(url_for(
'admin_login'
))
room_count
=
Room.query.count()
pending_booking_count
=
Booking.query.filter_by(status
=
'待审批'
).count()
return
render_template(
'admin/dashboard.html'
,
room_count
=
room_count,
pending_booking_count
=
pending_booking_count)
@app
.route(
'/admin/login'
, methods
=
[
'GET'
,
'POST'
])
def
admin_login():
if
request.method
=
=
'POST'
:
username
=
request.form.get(
'username'
)
password
=
request.form.get(
'password'
)
if
username
=
=
app.config[
'ADMIN_USERNAME'
]
and
password
=
=
app.config[
'ADMIN_PASSWORD'
]:
session[
'admin_logged_in'
]
=
True
return
redirect(url_for(
'admin_dashboard'
))
flash(
"用户名或密码错误"
,
"danger"
)
return
render_template(
'auth/login.html'
)
@app
.route(
'/admin/logout'
)
def
admin_logout():
session.pop(
'admin_logged_in'
,
None
)
return
redirect(url_for(
'index'
))
@app
.route(
'/admin/rooms'
)
def
manage_rooms():
if
not
session.get(
'admin_logged_in'
):
return
redirect(url_for(
'admin_login'
))
rooms
=
Room.query.
all
()
return
render_template(
'admin/rooms.html'
, rooms
=
rooms)
@app
.route(
'/admin/room/add'
, methods
=
[
'POST'
])
def
add_room():
if
not
session.get(
'admin_logged_in'
):
abort(
403
)
try
:
room
=
Room(
name
=
request.form[
'name'
],
capacity
=
int
(request.form[
'capacity'
]),
equipment
=
request.form.get(
'equipment'
, '')
)
db.session.add(room)
db.session.commit()
flash(
"会议室添加成功"
,
"success"
)
except
Exception as e:
db.session.rollback()
app.logger.error(f
"添加会议室失败: {str(e)}"
)
flash(
"添加会议室失败"
,
"danger"
)
return
redirect(url_for(
'manage_rooms'
))
@app
.route(
'/admin/room/delete/<int:id>'
)
def
delete_room(
id
):
if
not
session.get(
'admin_logged_in'
):
abort(
403
)
room
=
Room.query.get_or_404(
id
)
try
:
db.session.delete(room)
db.session.commit()
flash(
"会议室删除成功"
,
"success"
)
except
Exception as e:
db.session.rollback()
app.logger.error(f
"删除失败: {str(e)}"
)
flash(
"删除失败,请先处理相关预约"
,
"danger"
)
return
redirect(url_for(
'manage_rooms'
))
@app
.route(
'/admin/bookings'
)
def
manage_bookings():
if
not
session.get(
'admin_logged_in'
):
return
redirect(url_for(
'admin_login'
))
bookings
=
Booking.query.order_by(Booking.start_time.desc()).
all
()
return
render_template(
'admin/bookings.html'
,
bookings
=
bookings,
datetime
=
datetime)
bookings
=
Booking.query.order_by(Booking.created_at.desc()).
all
()
return
render_template(
'admin/bookings.html'
, bookings
=
bookings)
@app
.route(
'/admin/bookings/approve/<int:id>'
)
def
approve_booking(
id
):
if
not
session.get(
'admin_logged_in'
):
abort(
403
)
booking
=
Booking.query.get_or_404(
id
)
booking.status
=
'已批准'
db.session.commit()
return
redirect(url_for(
'manage_bookings'
))
@app
.route(
'/admin/bookings/reject/<int:id>'
)
def
reject_booking(
id
):
if
not
session.get(
'admin_logged_in'
):
abort(
403
)
booking
=
Booking.query.get_or_404(
id
)
booking.status
=
'已拒绝'
db.session.commit()
return
redirect(url_for(
'manage_bookings'
))
@app
.route(
'/calendar'
)
def
booking_calendar():
current_time
=
datetime.now()
approved_bookings
=
Booking.query.
filter
(
Booking.status
=
=
'已批准'
,
Booking.end_time > current_time
).order_by(Booking.start_time.asc()).
all
()
bookings_dict
=
{}
for
booking
in
approved_bookings:
date_str
=
booking.start_time.strftime(
'%Y-%m-%d'
)
if
date_str
not
in
bookings_dict:
bookings_dict[date_str]
=
{}
if
booking.room.name
not
in
bookings_dict[date_str]:
bookings_dict[date_str][booking.room.name]
=
[]
bookings_dict[date_str][booking.room.name].append({
'start'
: booking.start_time.strftime(
'%H:%M'
),
'end'
: booking.end_time.strftime(
'%H:%M'
),
'applicant'
: booking.applicant_name
})
return
render_template(
'calendar.html'
,
bookings
=
bookings_dict,
dates
=
sorted
(bookings_dict.keys()),
datetime
=
datetime)
if
__name__
=
=
'__main__'
:
with app.app_context():
db.create_all()
app.run(host
=
'0.0.0.0'
, port
=
5000
, debug
=
True
)