Skip to content

Queue

Installation

pip install litequeue

Use cases

You can use this to implement a persistent queue. It also has timing metrics for the messages, and the api to set a message as done lets you specify the message_id to be set as done.

Since it's all based on SQLite / SQL, it is easily extendable.

Tasks/messages are always passed as strings, so you can use JSON data as messages. Messages are interpreted as tasks, so after you pop a message, you need to mark it as done when you finish processing it.

Differences with a normal Python queue.Queue

  • Persistence
  • Different API to mark messages as done (you tell it which message_id to set as done)
  • Timing metrics. As long as messages are still in the queue or not pruned, you can see how long they have been there or how long they took to finish.
  • Easy to extend using SQL
  • Messages/elements/tasks in the queue are always strings

Messages data

  • message (text): the message itself, it must be a string
  • message_id (text): a random string ID generated when the message is put in the queue.
  • status (int): status of the message. 0 = free, 1 = locked (the message is being processed), 2 = done (the message has been processed, and it can be deleted).
  • in_time (int): the Unix epoch time when the message was inserted in the queue
  • lock_time (int): the Unix epoch time when the message was locked for processing
  • done_time (int): the Unix epoch time when the message was marked as done/processed

Architecture

SQLite does not have row-level locks, so we can't use the pattern like SELECT ... FOR UPDATE SKIP LOCKED. The current litequeue implementation marks a message as locked first and then returns it. The application is in charge of setting it as done. The problem with this approach is that the application could crash while processing the message/task, so it would stay marked locked forever. The messages table has an in_time and lock_time columns (both are Unix epochs). To counter the lock + crash problem, some logic could be implemented like:

time_locked = in_time - lock_time

if time_locked > threshhold:
delete/modify/add_again ( message )

With that pattern, you can check all the tasks that have been locked for more than X seconds and do whatever you need with them.

Examples

Initialize a queue and put 4 messages. Each time you put a message in the queue, it returns the rowid of the message you just inserted.

Put messages

from litequeue import SQLQueue

q = SQLQueue(":memory:")

q.put("hello")
q.put("world")
q.put("foo")
q.put("bar")
# 4  <- ID of the last row modified

Pop messages

Now we can use the q.pop() method to retrieve the next message. For each message, a random message_id will be generated on creation. The .pop() method returns a dictionary with the message's data.

q.pop()
# {'message': 'hello', 'message_id': '7da620ac542acd76c806dbcf00218426', ...}

Printing the queue

The queue object implements a __repr__ method, so you can use print(q) to check the contents.

print(q)


#    SQLQueue(Connection='sqlite3.Connection(...)', items=[{'done_time': None,
#      'in_time': 1612711137,
#      'lock_time': 1612711137,
#      'message': 'hello',
#      'status': 1,
#      'message_id': '7da620ac542acd76c806dbcf00218426'},
#       ...

Message processing

If we pop all the messages and try to pop another one, it will return None.

# pop remaining
for _ in range(3):
    q.pop()


assert q.pop() is None

Now we will insert 4 more messages. The last message returns 8. That means the last message inserted has a rowid of 8. Then we will pop() a message and save it in a variable called task. The tasks are returned as dictionaries.

q.put("hello")
q.put("world")
q.put("foo")
q.put("bar")

# 8 <- ID of the last row modified

task = q.pop()

assert task["message"] == "hello"

Peek a message

With the q.peek() method you can have a look at the next message to be processed. The method will return the message, but it won't pop it from the queue. Since we have already popped the "hello" message, the peek() method will return the "world" message.

q.peek()


#    {'message': 'world',
#     'message_id': '44cbc85f12b62891aa596b91f14183e5',
#     'status': 0,
#     'in_time': 1612711138,
#     'lock_time': None,
#     'done_time': None}


# next one that is free
assert q.peek()["message"] == "world"

# status = 0 = free
assert q.peek()["status"] == 0

Now we'll go back to the message we previously popped from the queue. We will mark it as done with the q.done(message_id) method. After that, we can use the q.get(message_id) method to check it has been marked as done ('status' = 2)

task["message"], task["message_id"]

# ('hello', 'c9b9ef76e3a77cc66dd749d485613ec1')

q.done(task["message_id"])

# 8 <- ID of the last row modified

q.get(task["message_id"])

#    {'message': 'hello',
#     'message_id': 'c9b9ef76e3a77cc66dd749d485613ec1',
#     'status': 2,    <---- status is now 2 (DONE)
#     'in_time': 1612711138,
#     'lock_time': 1612711138,
#     'done_time': 1612711138}


already_done = q.get(task["message_id"])

# stauts = 2 = done
assert already_done["status"] == 2

Message timing data

We can use the timing data that is automatically created during messages create/lock/mark as done steps.

in_time = already_done["in_time"]
lock_time = already_done["lock_time"]
done_time = already_done["done_time"]

assert done_time >= lock_time >= in_time

print(
    f"Task {already_done['message_id']} took {done_time - lock_time} seconds to get done and was in the queue for {done_time - in_time} seconds"
)

# Task c9b9ef76e3a77cc66dd749d485613ec1 took 0 seconds to get done and was in the queue for 0 seconds

Check queue size

We can get the queue size using the q.size() method. It will ignore the finished items, so the real number of rows in the SQLite database can be bigger than the number returned.

To remove the messages marked as done ('status' = 2), use the q.prune() method. This will remove those messages permanently.

assert q.qsize() == 7

next_one_msg = q.peek()["message"]
next_one_id = q.peek()["message_id"]

task = q.pop()

assert task["message"] == next_one_msg
assert task["message_id"] == next_one_id

# remove finished items
q.prune()

print(q)


#    SQLQueue(Connection='sqlite3.Connection(...)', items=[{'done_time': None,
#      'in_time': 1612711137,
#      'lock_time': 1612711137,
#      'message': 'hello',
#      'status': 1,
#      'message_id': '7da620ac542acd76c806dbcf00218426'},
#     {'done_time': None,
#      'in_time': 1612711137,
#      'lock_time': 1612711137,
#      'message': 'world',
#      'status': 1,
#      'message_id': 'a593292cfc8d2f3949eab857eafaf608'},
#     {'done_time': None,
#      'in_time': 1612711137,
#      'lock_time': 1612711137,
#      'message': 'foo',
#      'status': 1,
#      'message_id': '17e843a29770df8438ad72bbcf059bf5'},
#     ...

Set a max queue size

If you specify a maxsize when you initialize the queue, it will create a trigger that will raise an error when that size is reached. In Python, it will rise an sqlite3.IntegrityError exception.

from string import ascii_lowercase, printable
from random import choice


def random_string(string_length=10):
    """Generate a random string of fixed length """
    return "".join(choice(ascii_lowercase) for i in range(string_length))

q = SQLQueue(":memory:", maxsize=50)

for i in range(50):

    q.put(random_string(20))

assert q.qsize() == 50

An error is raised when the queue has reached its size limit.

import sqlite3

try:
    q.put(random_string(20))
except sqlite3.IntegrityError: # max len reached
    print("test pass")

# test pass

When we pop and item we can add another one. Take into account that q.put() will return the rowid of the latest inserted message, it does not represent the current queue size.

q.pop()

#    {'message': 'aktabyjadzrsohlitnei',
#     'message_id': '08b201c31099a296ef37f23b5257e5b6'}

# Now we can put another message without error
q.put("hello")

# 51

Empty queues

We can check if a queue is empty using the q.empty() method.

# Check if a queue is empty
assert q.empty() == False

q2 = SQLQueue(":memory:")

assert q2.empty() == True

Benchmarks

Inserting items in the queue.

import gc

In-memory SQL queue

q = SQLQueue(":memory:", maxsize=None)

gc.collect()

# %%timeit -n10000 -r7

q.put(random_string(20))

# 40.2 µs ± 12 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

q.qsize()

# 70000

Standard python queue.

from queue import Queue

q = Queue()

gc.collect()

# %%timeit -n10000 -r7

q.put(random_string(20))

# 21.9 µs ± 3.57 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

Persistent SQL queue

q = SQLQueue("test.queue", maxsize=None)

gc.collect()

# %%timeit -n10000 -r7

q.put(random_string(20))

# 161 µs ± 5.36 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

assert q.conn.isolation_level is None

Creating, popping and setting messages as done.

q = Queue()

gc.collect()

# %%timeit -n10000 -r7

tid = random_string(20)

q.put(tid)

q.get()

q.task_done()

# 27 µs ± 3.69 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

q = SQLQueue(":memory:", maxsize=None)

gc.collect()

# %%timeit -n10000 -r7

tid = random_string(20)

q.put(tid)

task = q.pop()

q.done(task["message_id"])

# 80.2 µs ± 4.02 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

SQLite version 3.35.0

SQLite introduced the RETURNING clause in version 3.35.0. If the SQLite used in you application supports it, the RETURNING clause will be used. The RETURNING clause makes message pop'ing a lot faster:

from litequeue import SQLQueue
import gc

from string import ascii_lowercase, printable
from random import choice


def random_string(string_length=10, fuzz=False, space=False):
    """Generate a random string of fixed length"""
    letters = ascii_lowercase
    letters = letters + " " if space else letters
    if fuzz:
        letters = printable
    return "".join(choice(letters) for i in range(string_length))

q = SQLQueue("pop_bench.db", maxsize=None)

###
# RETURNING
###
q.pop = q._pop_returning

gc.collect()

for _ in range(10000):
    tid = random_string(60)

    q.put(tid)

# benchmark block
for _ in range(8000):
    task = q.pop()

# time: 2.15 s


q = SQLQueue("pop_bench.db", maxsize=None)

###
# custom locking logic in a transaction
###
q.pop = q._pop_transaction

gc.collect()

for _ in range(10000):
    tid = random_string(60)

    q.put(tid)


# benchmark block
for _ in range(8000):
    task = q.pop()

# time: 9.08 s

Disclaimer

I'm still designing the internal structure of litequeue, the messages metadata and how they are created / locked / deleted, so changes can be expected. However, the main functionality and the exposed API of put() / pop() / done() / get() should stay the same. The changes will be mostly internal or adding new methods to the queue. Feedback is welcome!

Alternatives

  • Huey: Huey is a task queue implemented in Python, with multiple backends (Redis/SQLite/in-memory). Huey is a more "complete" task queue, it includes a lot of functionality that is missing from litequeue. The scope of Huey is much bigger, it lets you decorate functions, run tasks periodically, etc. litequeue tries to "just" be a primitive queue implementation on which to build other tools. Even though it's written in Python, litequeue is easy to port to other programming languages and have multiple processes interact with the same persistent queue.