celery worker를 구성했는데, task를 추가하기 전에 똑같은 task가 celery queue에 등록돼있는지 확인하고 추가하는 작업이 필요해졌습니다.

이게 베스트인지는 잘 모르겠는데.. 아무튼 엄청 중요한 task는 아니라서 (날아가도 됨) redis lock으로 적당히 구현했습니다

사용한 패키지는 redis-py, celery입니다.

 

import redis
import redis.lock
from celery import shared_task, Task


redis_conn: Union[redis.Redis, None] = None
redis_lock: Union[redis.lock.Lock, None] = None


class RedisTask(Task):
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        # cleanup
        global redis_conn, redis_lock
        if redis_lock and redis_lock.owned():
            redis_lock.release()
        if redis_conn:
            redis_conn.close()

        super().after_return(status, retval, task_id, args, kwargs, einfo)


@shared_task(base=RedisTask)
def my_task(some_param: SomeParam):
    global redis_conn, redis_lock
    redis_conn = redis.from_url(url="...")
    redis_lock = redis_conn.lock(str(some_param.id), timeout=600)

    if not redis_lock.acquire(blocking=True):
        raise Exception("LOCK IN USE")
    
    # task

대략 이런식인데요, lock의 구현방식을 보면 redis 커맨드 중 setnx랑 del을 이용해 구현되어 있습니다.

간단하게 구현할 수 있어서 좋네요

반응형