Source code for pisak.email.address_book
"""
Email address book management.
"""
from contextlib import contextmanager
from functools import wraps
from sqlalchemy import orm, func, Column, String, Integer, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
from pisak import logger, exceptions, text_tools, dirs
_LOG = logger.get_logger(__name__)
_DB_ENGINE_URL = "sqlite:///" + dirs.HOME_EMAIL_ADDRESS_BOOK
_Base = declarative_base()
class _Contact(_Base):
"""
Object representing record in the address book database.
"""
__tablename__ = "address_book"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=True)
address = Column(String, unique=True)
photo = Column(String, nullable=True)
@contextmanager
def _establish_db_session():
engine = create_engine(_DB_ENGINE_URL)
_Base.metadata.create_all(engine)
session = orm.sessionmaker(autoflush=False)
session.configure(bind=engine)
db_session = session()
try:
yield db_session
db_session.commit()
except:
db_session.rollback()
raise
finally:
db_session.close()
def _db_session_handler(func):
"""
Decorator for handling all the database session operations
and database API related errors.
Provides an access to the database to the object that the decorated
function belongs to, by setting its `sess` field with the database
session instance.
:param func: function to be decorated.
"""
@wraps(func)
def wrapper(obj, *args, **kwargs):
try:
with _establish_db_session() as obj.sess:
ret = func(obj, *args, **kwargs)
obj.sess = None
return ret
except SQLAlchemyError as exc:
raise AddressBookError(exc) from exc
return wrapper
[docs]class AddressBookError(exceptions.PisakException):
"""
Address book unexpected condition, maybe problems when accessing the database.
"""
pass
[docs]class AddressBook(text_tools.Predictor):
"""
Book of mail contacts. Serves also as a predictor
for new message address inserts.
Internally, the entire address book content is stored
inside a database. Database session instance that is used by some of the
methods, each time is provided to them by the '_db_session_handler'
decorator. After executing one of these methods all changes
made to the session are commited and the session is closed.
"""
__gtype_name__ = "PisakEmailAddressBook"
def __init__(self):
super().__init__()
self.sess = None # database session instance
self.basic_content = self._book_lookup()
self.apply_props()
[docs] def do_prediction(self, text, position):
"""
Implementation of the `text_tools.Predictor` method.
:param text: text to feed the predictor with.
:param position: how many signs from the given text should ba taken.
"""
feed = text[0 : position]
self.content = self._book_lookup(feed)
self.notify_content_update()
@_db_session_handler
[docs] def get_contact(self, contact_id):
"""
Get single contact from the address book.
:param contact_id: identification number of a contact that should
be returned.
:return: single instance of a `_Contact` with the given id or None if
there was no match.
"""
contact = self.sess.query(_Contact).filter(
_Contact.id == contact_id).first()
self.sess.expunge_all()
return contact
@_db_session_handler
[docs] def get_contact_by_address(self, address):
"""
As each address in the address book is unique one can
query for a given contact by its address.
:param address: address of the contact.
:return: `_Contact` object with a given address
or None if nothing found.
"""
contact = self.sess.query(_Contact).filter(
_Contact.address == address).first()
self.sess.expunge_all()
return contact
@_db_session_handler
[docs] def get_count(self):
"""
Get number of contacts in the address book.
:return: integer with number of contacts in the address book
"""
return self.sess.query(func.count(_Contact.id)).scalar()
@_db_session_handler
[docs] def get_all_contacts(self):
"""
Retrieve all records from the address book.
:return: list of all contacts.
"""
contacts = self.sess.query(_Contact).all()
self.sess.expunge_all()
return contacts
@_db_session_handler
[docs] def search_contacts(self, feed):
"""
Look for all the contacts that contain the given feed in their name
or address, sort them properly and return as a list.
:param feed: string that the search will be based on.
:return: list of all the matching contacts, sorted properly.
"""
contacts = sorted(self.sess.query(_Contact).filter(feed in
_Contact.address | (_Contact.name & feed in _Contact.name)).all(),
key=lambda contact: (contact.address if feed in
contact.address else contact.name).index(feed))
self.sess.expunge_all()
return contacts
@_db_session_handler
[docs] def add_contact(self, contact):
"""
Add new contact to the address book. Contact must contain 'address' key
and can contain the following keys: 'name' and 'photo'.
:param contact: dictionary with new contact.
:return: True on successfull update of the book with the given
contact or False otherwise, for example when address same as the
one of the given contact has already been in the book.
"""
address = contact["address"]
if not self.sess.query(_Contact).filter(
_Contact.address == address).first():
self.sess.add(
_Contact(
name=contact.get("name"),
address=address,
photo=contact.get("photo")))
return True
else:
_LOG.warning(
"Contact with address {} already in the "
"address book.".format(address))
return False
@_db_session_handler
[docs] def remove_contact(self, contact_id):
"""
Remove contact from the book. If the book does not contain
the given contact then nothing happens.
:param contact: id of the contact to be removed
"""
contact = self.sess.query(_Contact).filter(
_Contact.id == contact_id).first()
if contact:
self.sess.delete(contact)
else:
_LOG.warning("Trying to delete not existing "
"contact with id: {}.".format(contact_id))
[docs] def edit_contact_name(self, contact_id, name):
"""
Edit name of a contact.
:param contact_id: id of the contact
:param name: new name
"""
self._edit_contact(contact_id, "name", name)
[docs] def edit_contact_photo(self, contact_id, photo):
"""
Edit photo path for a contact.
:param contact_id: id of the contact
:param photo: path to the new photo
"""
self._edit_contact(contact_id, "photo", photo)
[docs] def edit_contact_address(self, contact_id, address):
"""
Edit email address of a contact.
:param contact_id: id of the contact
:param name: new email address
"""
self._edit_contact(contact_id, "address", address)
@_db_session_handler
def _edit_contact(self, contact_id, key, value):
contact = self.sess.query(_Contact).filter(
_Contact.id == contact_id).first()
if contact:
setattr(contact, key, value)
@_db_session_handler
def _book_lookup(self, feed=""):
match = self.sess.query(_Contact.address).filter(
_Contact.address.startswith(feed) |
(_Contact.name & _Contact.name.startswith(feed))).order_by(
_Contact.address).all()
self.sess.expunge_all()
return match