14 Build A Rest Api Frontend
15 REST API용 프런트엔드 구축
이 저장소에는 REST API용 프런트 엔드 구축에 대한 Real Python 튜토리얼과 관련된 코드가 포함되어 있습니다.
15.1 설정
먼저 가상 환경을 만들어야 합니다.
$ python -m venv venv
$ source venv/bin/activate
requirements.txt에서 고정된 종속성을 설치합니다.
(venv) $ python -m pip install -r requirements.txt
그런 다음 source_code_start/ 폴더로 이동하여 새 데이터베이스를 생성할 수 있습니다.
(venv) $ cd source_code_start
(venv) $ python init_database.py그러면 프로젝트에 사용할 수 있는 people.db 데이터베이스가 생성되거나 업데이트됩니다.
데이터베이스를 구축한 후 Flask 서버를 시작할 수 있습니다.
(venv) $ python app.pyFlask 서버가 실행 중일 때 http://localhost:8000에서 프런트 엔드를 방문하고 http://localhost:8000/api/ui에서 API 문서를 방문할 수 있습니다.
15.2 작가
- Philipp Acsany, 이메일: philipp@realpython.com
15.3 라이선스
MIT 라이센스에 따라 배포됩니다. 자세한 내용은 LICENSE를 참조하세요.
15.4 파일: source_code_final/app.py
import config
from flask import render_template
from models import Person
app = config.connex_app
app.add_api(config.basedir / "swagger.yml")
@app.route("/")
def home():
people = Person.query.all()
return render_template("home.html", people=people)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True)15.5 파일: source_code_final/config.py
import pathlib
import connexion
from flask_marshmallow import Marshmallow
from flask_sqlalchemy import SQLAlchemy
basedir = pathlib.Path(__file__).parent.resolve()
connex_app = connexion.App(__name__, specification_dir=basedir)
app = connex_app.app
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{basedir / 'people.db'}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
ma = Marshmallow(app)15.6 파일: source_code_final/init_database.py
from datetime import datetime
from config import app, db
from models import Note, Person
from sqlalchemy.exc import OperationalError
PEOPLE_NOTES = [
{
"lname": "Fairy",
"fname": "Tooth",
"notes": [
("I brush my teeth after each meal.", "2022-01-06 17:10:24"),
(
"The other day a friend said, I have big teeth.",
"2022-03-05 22:17:54",
),
("Do you pay per gram?", "2022-03-05 22:18:10"),
],
},
{
"lname": "Ruprecht",
"fname": "Knecht",
"notes": [
(
"I swear, I'll do better this year.",
"2022-01-01 09:15:03",
),
(
"Really! Only good deeds from now on!",
"2022-02-06 13:09:21",
),
],
},
{
"lname": "Bunny",
"fname": "Easter",
"notes": [
(
"Please keep the current inflation rate in mind!",
"2022-01-07 22:47:54",
),
("No need to hide the eggs this time.", "2022-04-06 13:03:17"),
],
},
]
def get_data_from_table(model):
try:
data = db.session.query(model).all()
db.session.close()
return data
except OperationalError:
return []
def create_database(db):
db.create_all()
for data in PEOPLE_NOTES:
new_person = Person(lname=data.get("lname"), fname=data.get("fname"))
for content, timestamp in data.get("notes", []):
new_person.notes.append(
Note(
content=content,
timestamp=datetime.strptime(
timestamp, "%Y-%m-%d %H:%M:%S"
),
)
)
db.session.add(new_person)
db.session.commit()
print("Created new database")
def update_database(db, existing_people, existing_notes):
db.drop_all()
db.create_all()
for person in existing_people:
db.session.merge(person)
for note in existing_notes:
db.session.merge(note)
db.session.commit()
print("Updated existing database")
with app.app_context():
existing_people = get_data_from_table(Person)
existing_notes = get_data_from_table(Note)
if not existing_people:
create_database(db)
else:
update_database(db, existing_people, existing_notes)15.7 파일: source_code_final/models.py
from datetime import datetime
from config import db, ma
from marshmallow_sqlalchemy import fields
class Note(db.Model):
__tablename__ = "note"
id = db.Column(db.Integer, primary_key=True)
person_id = db.Column(db.Integer, db.ForeignKey("person.id"))
content = db.Column(db.String, nullable=False)
timestamp = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
class NoteSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Note
load_instance = True
sqla_session = db.session
include_fk = True
class Person(db.Model):
__tablename__ = "person"
id = db.Column(db.Integer, primary_key=True)
lname = db.Column(db.String(32), nullable=False)
fname = db.Column(db.String(32))
timestamp = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
notes = db.relationship(
Note,
backref="person",
cascade="all, delete, delete-orphan",
single_parent=True,
order_by="desc(Note.timestamp)",
)
class PersonSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Person
load_instance = True
sqla_session = db.session
include_relationships = True
notes = fields.Nested(NoteSchema, many=True)
note_schema = NoteSchema()
person_schema = PersonSchema()
people_schema = PersonSchema(many=True)15.8 파일: source_code_final/notes.py
from config import db
from flask import abort, make_response
from models import Note, Person, note_schema
def read_one(note_id):
note = Note.query.get(note_id)
if note is not None:
return note_schema.dump(note)
else:
abort(404, f"Note with ID {note_id} not found")
def update(note_id, note):
existing_note = Note.query.get(note_id)
if existing_note:
update_note = note_schema.load(note, session=db.session)
existing_note.content = update_note.content
db.session.merge(existing_note)
db.session.commit()
return note_schema.dump(existing_note), 201
else:
abort(404, f"Note with ID {note_id} not found")
def delete(note_id):
existing_note = Note.query.get(note_id)
if existing_note:
db.session.delete(existing_note)
db.session.commit()
return make_response(f"{note_id} successfully deleted", 204)
else:
abort(404, f"Note with ID {note_id} not found")
def create(note):
person_id = note.get("person_id")
person = Person.query.get(person_id)
if person:
new_note = note_schema.load(note, session=db.session)
person.notes.append(new_note)
db.session.commit()
return note_schema.dump(new_note), 201
else:
abort(404, f"Person not found for ID: {person_id}")15.9 파일: source_code_final/people.py
from config import db
from flask import abort, make_response
from models import Person, people_schema, person_schema
def read_all():
people = Person.query.all()
return people_schema.dump(people)
def create(person):
new_person = person_schema.load(person, session=db.session)
db.session.add(new_person)
db.session.commit()
return person_schema.dump(new_person), 201
def read_one(person_id):
person = Person.query.get(person_id)
if person is not None:
return person_schema.dump(person)
else:
abort(404, f"Person with ID {person_id} not found")
def update(person_id, person):
existing_person = Person.query.get(person_id)
if existing_person:
update_person = person_schema.load(person, session=db.session)
existing_person.fname = update_person.fname
existing_person.lname = update_person.lname
db.session.merge(existing_person)
db.session.commit()
return person_schema.dump(existing_person), 201
else:
abort(404, f"Person with ID {person_id} not found")
def delete(person_id):
existing_person = Person.query.get(person_id)
if existing_person:
db.session.delete(existing_person)
db.session.commit()
return make_response(f"{person_id} successfully deleted", 200)
else:
abort(404, f"Person with ID {person_id} not found")