limited number of user-initiated background processes limited number of user-initiated background processes django django

limited number of user-initiated background processes


I have two solutions for this particular case, one an out of the box solution by celery, and another one that you implement yourself.

  1. You can do something like this with celery workers. In particular, you only create two worker processes with concurrency=1 (or well, one with concurrency=2, but that's gonna be threads, not different processes), this way, only two jobs can be done asynchronously. Now you need a way to raise exceptions if both jobs are occupied, then you use inspect, to count the number of active tasks and throw exceptions if required. For implementation, you can checkout this SO post.

You might also be interested in rate limits.

  1. You can do it all yourself, using a locking solution of choice. In particular, a nice implementation that makes sure only two processes are running with redis (and redis-py) is as simple as the following. (Considering you know redis, since you know celery)

    from redis import StrictRedisredis = StrictRedis('localhost', '6379')locks = ['compute:lock1', 'compute:lock2']for key in locks:    lock = redis.lock(key, blocking_timeout=5)    acquired = lock.acquire()    if acquired:        do_huge_computation()        lock.release()        break    print("Gonna try next possible slot")if not acquired:    raise SystemLimitsReached("Already at max capacity !")

This way you make sure only two running processes can exist in the system. A third processes will block in the line lock.acquire() for blocking_timeout seconds, if the locking was successful, acquired would be True, else it's False and you'd tell your user to wait !

I had the same requirement sometime in the past and what I ended up coding was something like the solution above. In particular

  1. This has the least amount of race conditions possible
  2. It's easy to read
  3. Doesn't depend on a sysadmin, suddenly doubling the concurrency of workers under load and blowing up the whole system.
  4. You can also implement the limit per user, meaning each user can have 2 simultaneous running jobs, by only changing the lock keys from compute:lock1 to compute:userId:lock1 and lock2 accordingly. You can't do this one with vanila celery.


First of all you need to limit concurrency on your worker (docs):

celery -A proj worker --loglevel=INFO --concurrency=2 -n <worker_name>

This will help to make sure that you do not have more than 2 active tasks even if you will have errors in the code.

Now you have 2 ways to implement task number validation:

  1. You can use inspect to get number of active and scheduled tasks:

     from celery import current_app def start_job():      inspect = current_app.control.inspect()      active_tasks = inspect.active() or {}      scheduled_tasks = inspect.scheduled() or {}      worker_key = 'celery@%s' % <worker_name>      worker_tasks = active_tasks.get(worker_key, []) + scheduled_tasks.get(worker_key, [])      if len(worker_tasks) >= 2:          raise MyCustomException('It is impossible to start more than 2 tasks.')       else:          my_task.delay()
  2. You can store number of currently executing tasks in DB and validate task execution based on it.

Second approach could be better if you want to scale your functionality - introduce premium users or do not allow to execute 2 requests by one user.


First

You need the first part of SpiXel's solution. According to him, "you only create two worker processes with concurrency=1".

Second

Set the time out for the task waiting in the queue, which is set CELERY_EVENT_QUEUE_TTL and the queue length limit according to how to limit number of tasks in queue and stop feeding when full?.

Therefore, when the two work running jobs, and the task in the queue waiting like 10 sec or any period time you like, the task will be time out. Or if the queue has been fulfilled, new arrival tasks will be dropped out.

Third

you need extra things to deal with notifying "users when jobs are rejected because the server is busy".

Dead Letter Exchanges is what you need. Every time a task is failed because of the queue length limit or message timeout. "Messages will be dropped or dead-lettered from the front of the queue to make room for new messages once the limit is reached."

You can set "x-dead-letter-exchange" to route to another queue, once this queue receive the dead lettered message, you can send a notification message to users.