Running Unique tasks in celery

At my day job, we had a requirement to make one of the api endpoints fast. The endpoint was computation intense and was taking lot of time. After profiling the endpoint using django-silk, we came to the conclusion that sql wasn’t the issue.

One possible solution was to move the computation in a celery worker, put the results in a separate table and serve the web requests from the table directly. The computations for a particular loan was triggered by a couple of events which were easy to determine. Added benefit for us was that, we had other use cases for the table.

The uniqueness part comes in when the end result of computing multiple times is same as computing once (idempotent). If there are two workers and they start working on the computations for the same loan, they will produce same results and would end up updating the database with those results. To maintain uniqueness, we used locking using redis.

While working on this, i came across two solutions:

  1. Discarding the task on the worker

In this method, we would decorate our task with a function that would check if it can acquire the lock for the loan or not. If it can’t acquire the lock this means that there is a worker working on this loan and we don’t need to compute again so, it doesn’t run any further. In this, we are using transaction aware task (it should work on the Task class exposed by celery as well but, for our use case we need TransactionAwareTask; https://gist.github.com/tapanpandita/46d2e2f63c7425547a865cb6298a172f  )

def should_compute_for_loan(key):
    def decorated_func(func):
        @functools.wraps(func)
        def inner(*args, **kwargs):
            """
                Apply a lock on a key and checks if we should go ahead
                and run the celery task
            """
            has_lock, return_value = False, False
            loan_id = args[0]
            lock = cache.lock(key.format(loan_id=loan_id), timeout=600)
            try:
                has_lock = lock.acquire(blocking=False)
                if has_lock:
                    return_value = func(*args, **kwargs)
            finally:
                if has_lock:
                    lock.release()
            return return_value

        return inner

    return decorated_func

@app.task(base=TransactionAwareTask)
@should_compute_for_loan(key='heavy_computation:{loan_id}')
def recompute_heavy_computation(loan_id):

The shortcoming with this method is that even if the computation is taking place only once, we still would need to publish the task which means the queue still gets flooded.

2. Discarding the task in django

class TransactionAwareUniqueTask(TransactionAwareTask):
    '''
        Makes sure that a task is computed only once using locking.
        The task itself is triggered by django as a callback when
        the transaction is committed successfully.
        Usage: subclassed_task.delay(some_args, key='some-namespacing-id-for-uniqueness')
    '''
    abstract = True

    def delay(self, *args, **kwargs):
        '''
            Makes a lock using redis for given key
        '''

        has_lock = False
        key = kwargs['key']
        lock = cache.lock(
            key, timeout=600, blocking_timeout=0.00001
        )
        has_lock = lock.acquire(blocking=False)
        if has_lock:
            LOGGER.debug("Lock acquired: %s", key)
            super(TransactionAwareUniqueTask, self).delay(*args, **kwargs)
        else:
            LOGGER.debug("Can not get lock: %s", key)

and use it like:

@app.task(base=TransactionAwareUniqueTask, acks_late=True)

I have used acks_late instead of the default because we want to ack rmq when the task has finished and not when it has received. This means that in case the worker dies after taking up the job, rmq will make sure that it doesn’t remove the task from the queue.

The lock still needs to be removed by the worker thread because, once the computation is completed, you want to make sure that if there is a requirement to compute again within the timeout period of the lock, it’s possible to do so. This can be achieved by using task_postrun signal provided by celery. This also gets triggered when the task fails to run for some reason, i.e, if an exception happens within the task.

@task_postrun.connect(sender=recompute_heavy_computation)
def release_the_lock(*args, **kwargs):
    """
        Release the redis lock
    """

    key = kwargs['kwargs']['key']
    LOGGER.debug("About to delete: %s", key)
    cache.client.delete(key)
    LOGGER.debug("Deleted lock: %s", key)

This deletes the key instead of using the locking interface. This is one thing that bothers me a little but, i couldn’t find any better solution. The lock itself needs to be acquired before releasing and you can’t pass the lock object to the worker thread because it won’t be json serialisable. This feels a bit hacky but, it sure worked.

Advertisements

Detecting USB Insertion/Removal using Python

In my last blog, i wrote about how usbmount can be used to automatically mount a usb. Today, i had to detect this mounting from python and show it in a web application that a usb has been inserted. I used pyudev for this and ran it in a different thread so that the main application thread is not affected by pyudev’s monitor loop.

class USBDetector():
    ''' Monitor udev for detection of usb '''

    def __init__(self):
        ''' Initiate the object '''
        thread = threading.Thread(target=self._work)
        thread.daemon = True
        thread.start()

    def _work(self):
        ''' Runs the actual loop to detect the events '''
        self.context = pyudev.Context()
        self.monitor = pyudev.Monitor.from_netlink(self.context)
        self.monitor.filter_by(subsystem='usb')
        # this is module level logger, can be ignored
        LOGGER.info("Starting to monitor for usb")
        self.monitor.start()
        for device in iter(self.monitor.poll, None):
            LOGGER.info("Got USB event: %s", device.action)
            if device.action == 'add':
                # some function to run on insertion of usb
                self.on_created()
            else:
                # some function to run on removal of usb
                self.on_deleted()

Since, usbmount mounts the filesystem of usb at some particular locations (/media/usb0-7), you can easily check for files in those folders and do whatever you wish to do with it.

Using the above code would be as easy as creating an object of USBDetector class.

Detecting and Automatically Mounting Pendrive on Raspbian Stretch Lite

In the lite version of Raspbian Stretch, i didn’t expect that i would have to manually mount the pendrive. I had a use case in which mounting it automatically was a necessity and thus i came across usbmount.

You can install it in raspbian using apt-get.

sudo apt-get install usbmount

Usbmount gives a lot of configuration options in /etc/usbmount/usbmount.conf  but, for my use case the default were good enough.

This wasn’t enough though. It wasn’t detecting the usb stick. And, on searching i found out that i wasn’t the only one who was having this problem. For the usbmount to work in raspbian stretch, you will have to manually edit systemd file for udevd located in /lib/systemd/system/systemd-udevd.service and change MountFlags=slave to MountFlags=shared as someone commented here.

Setting up postgres for python development

Postgresql is my database of choice mainly because, almost (if not all) web applications in fedora infrastructure use postgres and i have been using the same. But, every time i use a fresh system, i face some issues with getting started. Almost all of them are for client authentication. This post is here in case it happens the same in future.

  1. Install the dependencies:
    1. sudo dnf install postgresql postgresql-devel postgresql-server postgresql-contrib
  2. Create a db user other than postgres:
    1. sudo -i
    2. su – postgres
    3. initdb
    4. Start the database using command: pg_ctl -D /var/lib/pgsql/data -l logfile start
    5. psql -U postgres
    6. create database pagure;
    7. create user pagure with password ‘pagure’;
    8. grant all privileges on database pagure to pagure;
    9. show hba_file;
    10. Go to that file and change the auth mode to trust;
  3. If runs into: unregistered authentication agent for unix-process https://techglimpse.com/solution-polkitd-postgresql-start-error/

Tornado with systemd

Unlike Django or Flask, tornado is not WSGI based. It also ships with it’s own HTTP server and running the tornado server in production is not very much different from what you do while running it while developing. You may want to run the server in background though.

The systemd config file for tornado server that i am currently using is:

[Unit]
Description=Tornado server service file
After=rabbitmq-server.target

[Service]
ExecStart=/usr/bin/python /path/to/tornado/server/main/method.py
Restart=on-abort

[Install]
WantedBy=multi-user.target

For my use case, it should start after rabbitmq-server has started. After putting this script in /etc/systemd/system/ , you can start the server by:

sudo systemctl start servicename.service

It’s status can be checked using:

sudo systemctl status servicename.service

For people with trust issues, you can use curl or visit the site itself to reconfirm. In my case, the tornado server was a websocket server and not http. So, i had to put some additional parameters with curl which i got from here.

➜ curl –include \
–no-buffer \
–header “Connection: Upgrade” \
–header “Upgrade: websocket” \
–header “Host: http://127.0.0.1:8080” \
–header “Origin: http://127.0.0.1:8080” \
–header “Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==” \
–header “Sec-WebSocket-Version: 13” \
http://127.0.0.1:8080/ws

That’s it.

Token based Websocket Authentication

At my day job, i had to implement websockets and thus authentication of the websocket connection came up. There were two different types of clients but, the authentication for browser client was the biggest headache.

For a normal HTTP request, we use cookies for authentication of a user. Websocket protocol in itself doesn’t define how a websocket connection should be authenticated. According to RFC 6455, under section 10.5:

   This protocol doesn't prescribe any particular way that servers can
   authenticate clients during the WebSocket handshake. The WebSocket
   server can use any client authentication mechanism available to a
   generic HTTP server, such as cookies, HTTP authentication, or TLS
   authentication.

So, the first thing that comes to mind is: Why not use the same cookies that we use for an HTTP request? I thought the same too but, eventually, decided to use token based authentication.

Why not use the cookie?
We are using Django for our main web application. Django and WSGI based python frameworks in general, are not built for long lived connections. So, for websockets we are using Tornado.

In Django, by default, cookies are not readable by javascript. They are marked as HTTP only and thus the browser uses the cookie only for making http/https requests to the origin server. It can be turned off by using:

SESSION_COOKIE_HTTPONLY = False

Screenshot from 2017-10-15 00-16-41

The above image is when you have SESSION_COOKIE_HTTPONLY = True .

Screenshot from 2017-10-15 00-42-28

This is when you set SESSION_COOKIE_HTTPONLY  to False. The “`sessionid“` is the one which will be used by the server to identify the user.

The main benefit of not exposing sessionid to js in the browser is that it if someone successfully performs a XSS attack they won’t be able to hijack the session. Setting the cookie to be not http only would have been the easiest option for me but, as it was not recommended, i went for token based authentication.

Token based authentication

For token based authentication to work, the Django server will have to generate a token on every request (for the endpoints which requires the websocket connection). Once the browser gets the token, it can initiate a websocket connection to the tornado server. While opening the websocket connection, the browser will send the token as well. On the server side, there should be a common store where Django can store the token and Tornado can retrieve the token to verify the request.

Generating the token on server side for multiple views can be done by making a python decorator. But, if you are making a decorator and want to pass on a variable to the original function itself, you will have to add an extra parameter on the function to receive the variable’s value. This was a big task and would have meant a lot of changes across the project. Instead, i went on to make project wide template tags.

Making a project wide template tag in django for creating tokens

  1. Create a folder under the project’s main directory and create two files: __init__.py and create_ws_tokens.py
  2.  In create_ws_tokens.py, you can put something like this.:
    import uuid
    import json
    import datetime
    
    from django import template
    from project_name import redis_conn
    
    register = template.Library()
    
    @register.simple_tag(takes_context=True)
    def create_ws_token(context):
        request = context['request']
        if not request.user.is_authenticated():
            return 'Not authed'
        user = request.user.username
        current_time = datetime.datetime.strftime(
                datetime.datetime.utcnow(),
                "%d:%m:%Y:%H:%M:%S"
        )
        token = 'wstoken' + uuid.uuid4().hex
        output = {
           'user': user,
           'time': current_time
        }
        redis_conn.set(token, json.dumps(output))
        return token
    
    
  3. Put the following snippet inside the Templates -> Options in settings of the project.
    'libraries': {
    'create_ws_token': 'project_name.templatetags.create_ws_token',
    },
    
  4. Now to use this template tag in any template, you will need to load it.
    {% load create_ws_token %}
    and

    <script>
        var token = '{% create_ws_token %}';
        if (token.startsWith('wstoken')) {
            socket(token);
        }
    </script>
    

socket is a function which is defined in other js file which creates a websocket connection.


ws = create_ws("ws://localhost:8080/wsb?ws_token="+ws_token);

From tornado side, we need to get the ws_token and query redis for a verification.


def open(self):
    ''' Called by tornado when a new connection opens up '''
    self.user = None
    if 'ws_token' in self.request.arguments:
        token = self.request.arguments['ws_token'][0]
        self.user = self.authenticate(token)
        if self.user:
             tsockets.add_socket(self.user, self)
             print 'New connection from browser!'
        else:
             self.close()
    else:
        self.close()

The authenticate method would be like:


def authenticate(self, token):
    ''' Check for authentic token in redis '''
    inredis = self.application.redis_conn.get(token)
    if inredis:
        inredis = json.loads(inredis)
        self.application.redis_conn.delete(token)
        current_time = datetime.datetime.utcnow()
        valid_time = current_time - datetime.timedelta(seconds=20)
        inredis_time = datetime.datetime.strptime(
           inredis['time'], "%d:%m:%Y:%H:%M:%S"
         )
        if valid_time <= inredis_time:
            return inredis['user']
    return False

I chose redis because, Tornado is a single threaded server and connecting to db, if it’s not async will result in a blocking connection which means the real time features will get affected.

That’s it.

Using C function from Python

ctypes is a python library which allows using C data types, functions from a python script. It’s in the standard python library. To use C functions using ctypes, you will need to compile the C code and create a shared library.

add.c


#include <stdio.h>

int add_two_numbers(int num1, int num2) {
    return num1 + num2;
}

I will be using a very simple C function in this case which adds two given numbers

Now compile this file using:
gcc -fPIC -shared -o libadd2nums.so add.c

This will create a shared library named libadd2nums.so which, for now, contains only one function.

add.py


# coding=utf-8

import ctypes

_add = ctypes.CDLL('/home/vivek/ctypestuts/libadd2nums.so')
_add.add_two_numbers.argtypes = (ctypes.c_int, ctypes.c_int)

def add_two_numbers(num1, num2):
   ''' Adds two numbers '''

   return _add.add_two_numbers(ctypes.c_int(num1), ctypes.c_int(num2))

I am using fedora 26. If you are using Windows, you will need to use ctypes.WinDLL.

_add here, is the shared library and we can access the C function using dot(.) .

main.py


# coding=utf-8

import add

num1 = int(raw_input("Enter num1: "))
num2 = int(raw_input("Enter num2: "))
print add.add_two_numbers(num1, num2)

That’s it.