A lightweight per-transaction Python function queue for Flask

The premise: each time a certain API method is called within a Flask / SQLAlchemy app (a method that primarily involves saving something to the database), send various notifications, e.g. log to the standard logger, and send an email to site admins. However, the way the API works, is that several different methods can be forced to run in a single DB transaction, by specifying that SQLAlchemy only perform a commit when the last method is called. Ideally, no notifications should actually get triggered until the DB transaction has been successfully committed; and when the commit has finished, the notifications should trigger in the order that the API methods were called.

There are various possible solutions that can accomplish this, for example: a celery task queue, an event scheduler, and a synchronised / threaded queue. However, those are all fairly heavy solutions to this problem, because we only need a queue that runs inside one thread, and that lives for the duration of a single DB transaction (and therefore also only for a single request).

To solve this problem, I implemented a very lightweight function queue, where each queue is a deque instance, that lives inside flask.g, and that is therefore available for the duration of a given request context (or app context).

The code

The whole implementation really just consists of this one function:

from collections import deque

from flask import g


def queue_and_delayed_execute(
        queue_key, session_hash, func_to_enqueue,
        func_to_enqueue_ctx=None, is_time_to_execute_funcs=False):
    """Add a function to a queue, then execute the funcs now or later.

    Creates a unique deque() queue for each queue_key / session_hash
    combination, and stores the queue in flask.g. The idea is that
    queue_key is some meaningful identifier for the functions in the
    queue (e.g. 'banana_masher_queue'), and that session_hash is some
    identifier that's guaranteed to be unique, in the case of there
    being multiple queues for the same queue_key at the same time (e.g.
    if there's a one-to-one mapping between a queue and a SQLAlchemy
    transaction, then hash(db.session) is a suitable value to pass in
    for session_hash).

    Since flask.g only stores data for the lifetime of the current
    request (or for the lifetime of the current app context, if not
    running in a request context), this function should only be used for
    a queue of functions that's guaranteed to only be built up and
    executed within a single request (e.g. within a single DB
    transaction).

    Adds func_to_enqueue to the queue (and passes func_to_enqueue_ctx as
    kwargs if it has been provided). If is_time_to_execute_funcs is
    True (e.g. if a DB transaction has just been committed), then takes
    each function out of the queue in FIFO order, and executes the
    function.
    """
    # Initialise the set of queues for queue_key
    if queue_key not in g:
        setattr(g, queue_key, {})

    # Initialise the unique queue for the specified session_hash
    func_queues = getattr(g, queue_key)
    if session_hash not in func_queues:
        func_queues[session_hash] = deque()

    func_queue = func_queues[session_hash]

    # Add the passed-in function and its context values to the queue
    func_queue.append((func_to_enqueue, func_to_enqueue_ctx))

    if is_time_to_execute_funcs:
        # Take each function out of the queue and execute it
        while func_queue:
            func_to_execute, func_to_execute_ctx = (
                func_queue.popleft())
            func_ctx = (
                func_to_execute_ctx
                if func_to_execute_ctx is not None
                else {})
            func_to_execute(**func_ctx)

        # The queue is now empty, so clean up by deleting the queue
        # object from flask.g
        del func_queues[session_hash]

To use the function queue, calling code should look something like this:

from flask import current_app as app
from flask_mail import Message
from sqlalchemy.exc import SQLAlchemyError

from myapp.extensions import db, mail


def do_api_log_msg(log_msg):
    """Log the specified message to the app logger."""
    app.logger.info(log_msg)


def do_api_notify_email(mail_subject, mail_body):
    """Send the specified notification email to site admins."""
    msg = Message(
        mail_subject,
        sender=app.config['MAIL_DEFAULT_SENDER'],
        recipients=app.config['CONTACT_EMAIL_RECIPIENTS'])
    msg.body = mail_body

    mail.send(msg)

    # Added for demonstration purposes, not really needed in production
    app.logger.info('Sent email: {0}'.format(mail_subject))


def finalise_api_op(
        log_msg=None, mail_subject=None, mail_body=None,
        is_db_session_commit=False, is_app_logger=False,
        is_send_notify_email=False):
    """Finalise an API operation by committing and logging."""
    # Get a unique identifier for this DB transaction
    session_hash = hash(db.session)

    if is_db_session_commit:
        try:
            db.session.commit()

            # Added for demonstration purposes, not really needed in
            # production
            app.logger.info('Committed DB transaction')
        except SQLAlchemyError as exc:
            db.session.rollback()
            return {'error': 'error finalising api op'}

    if is_app_logger:
        queue_key = 'api_log_msg_queue'

        func_to_enqueue_ctx = dict(log_msg=log_msg)

        queue_and_delayed_execute(
            queue_key=queue_key, session_hash=session_hash,
            func_to_enqueue=do_api_log_msg,
            func_to_enqueue_ctx=func_to_enqueue_ctx,
            is_time_to_execute_funcs=is_db_session_commit)

    if is_send_notify_email:
        queue_key = 'api_notify_email_queue'

        func_to_enqueue_ctx = dict(
            mail_subject=mail_subject, mail_body=mail_body)

        queue_and_delayed_execute(
            queue_key=queue_key, session_hash=session_hash,
            func_to_enqueue=do_api_notify_email,
            func_to_enqueue_ctx=func_to_enqueue_ctx,
            is_time_to_execute_funcs=is_db_session_commit)

    return {'message': 'api op finalised ok'}

And that code can be called from a bunch of API methods like so:

def update_froggy_colour(
        froggy, colour, is_db_session_commit=False, is_app_logger=False,
        is_send_notify_email=False):
    """Update a froggy's colour."""
    froggy.colour = colour

    db.session.add(froggy)

    log_msg = ((
        'Froggy colour updated: {froggy.id}; new value: '
        '{froggy.colour}').format(froggy=froggy))
    mail_body = (
        'Froggy: {froggy.id}; new colour: {froggy.colour}'.format(
            froggy=froggy))

    result = finalise_api_op(
        log_msg=log_msg, mail_subject='Froggy colour updated',
        mail_body=mail_body, is_db_session_commit=is_db_session_commit,
        is_app_logger=is_app_logger,
        is_send_notify_email=is_send_notify_email)

    return result


def make_froggy_jump(
        froggy, jump_height, is_db_session_commit=False,
        is_app_logger=False, is_send_notify_email=False):
    """Make a froggy jump."""
    froggy.is_jumping = True
    froggy.jump_height = jump_height

    db.session.add(froggy)

    log_msg = ((
        'Made froggy jump: {froggy.id}; jump height: '
        '{froggy.jump_height}').format(froggy=froggy))
    mail_body = (
        'Froggy: {froggy.id}; jump height: {froggy.jump_height}'.format(
            froggy=froggy))

    result = finalise_api_op(
        log_msg=log_msg, mail_subject='Made froggy jump',
        mail_body=mail_body, is_db_session_commit=is_db_session_commit,
        is_app_logger=is_app_logger,
        is_send_notify_email=is_send_notify_email)

    return result

And the API methods can be called like so:

def make_froggy_brightpink_and_highjump(froggy):
    """Make a froggy bright pink and jumping high."""
    results = []

    result1 = update_froggy_colour(
        froggy, "bright_pink", is_app_logger=True)
    results.append(result1)

    result2 = make_froggy_jump(
        froggy, "50 metres", is_db_session_commit=True,
        is_app_logger=True, is_send_notify_email=True)
    results.append(result2)

    return results

If make_froggy_brightpink_and_highjump() is called from within a Flask app context, the app's log should include output that looks something like this:

INFO [2017-12-01 09:00:00] Committed DB transaction
INFO [2017-12-01 09:00:00] Froggy colour updated: 123; new value: bright_pink
INFO [2017-12-01 09:00:00] Made froggy jump: 123; jump height: 50 metres
INFO [2017-12-01 09:00:00] Sent email: Made froggy jump

The log output demonstrates that the desired behaviour has been achieved: first, the DB transaction finishes (i.e. the froggy actually gets set to bright pink, and made to jump high, in one atomic write operation); then, the API actions are logged in the order that they were called (first the colour was updated, then the froggy was made to jump); then, email notifications are sent in order (in this case, we only want an email notification sent for when the froggy jumps high – but if we had also asked for an email notification for when the froggy's colour was changed, that would have been the first email sent).

In summary

That's about all there is to this "task queue" implementation – as I said, it's very lightweight, because it only needs to be simple and short-lived. I'm sharing this solution, mainly to serve as a reminder that you shouldn't just use your standard hammer, because sometimes the hammer is disproportionately big compared to the nail. In this case, the solution doesn't need an asynchronous queue, it doesn't need a scheduled queue, and it doesn't need a threaded queue. (Although moving the email sending off to a celery task is a good idea in production; and moving the logging to celery would be warranted too, if it was logging to a third-party service rather than just to a local file.) It just needs a queue that builds up and that then gets processed, for a single DB transaction.

Post a comment