celery worker를 구성했는데, task를 추가하기 전에 똑같은 task가 celery queue에 등록돼있는지 확인하고 추가하는 작업이 필요해졌습니다.
이게 베스트인지는 잘 모르겠는데.. 아무튼 엄청 중요한 task는 아니라서 (날아가도 됨) redis lock으로 적당히 구현했습니다
사용한 패키지는 redis-py, celery입니다.
import redis
import redis.lock
from celery import shared_task, Task
redis_conn: Union[redis.Redis, None] = None
redis_lock: Union[redis.lock.Lock, None] = None
class RedisTask(Task):
abstract = True
def after_return(self, status, retval, task_id, args, kwargs, einfo):
# cleanup
global redis_conn, redis_lock
if redis_lock and redis_lock.owned():
redis_lock.release()
if redis_conn:
redis_conn.close()
super().after_return(status, retval, task_id, args, kwargs, einfo)
@shared_task(base=RedisTask)
def my_task(some_param: SomeParam):
global redis_conn, redis_lock
redis_conn = redis.from_url(url="...")
redis_lock = redis_conn.lock(str(some_param.id), timeout=600)
if not redis_lock.acquire(blocking=True):
raise Exception("LOCK IN USE")
# task
대략 이런식인데요, lock의 구현방식을 보면 redis 커맨드 중 setnx랑 del을 이용해 구현되어 있습니다.
간단하게 구현할 수 있어서 좋네요
반응형
'프로그래밍 > Python' 카테고리의 다른 글
[Python] Paramiko GSSAPI 사용법 (0) | 2023.10.09 |
---|---|
[Jupyter Notebook] 줄바꿈(word wrap, 워드랩) 적용하는 법 (0) | 2022.08.08 |
[conda] base environment activate 기본동작 막기 (0) | 2022.08.05 |
[Jupyter Notebook] 탭 없이 자동완성 하는법 (jupyter_contrib_nbextensions) (0) | 2022.08.05 |
[Python] m1 mac에서 homebrew로 conda 설치 및 jupyter 세팅 (0) | 2022.08.04 |