Если вы решили поменять сериализатор задач для Celery с Pickle на какой‑нибудь другой, ничего не выйдет. Вернее, задачи действительно будут сериализоваться и десериализоваться с помощью указанного сериализатора. Но внутри Celery всё равно используется Pickle, и это никак не изменить. Вернее, не в Celery, а в библиотеке billiard, которая используется мастер‑процессом воркера для раздачи задач из очереди подпроцессам.
В нашем проекте мы столкнулись с тем, что иногда передаём в качестве параметра задач Celery NoneType. Причём не значение None, а именно NoneType.
from celery import Celery
app = Celery('app', broker='mongodb://localhost:27017/celery_dumps')
@app.task
def print_xy(x):
print(type(x))
if __name__ == '__main__':
print_xy.delay(type(None))
Не знаю, как в Python 3, но в Python 2.7 за подобные вольности наказывают:
> python app.py
PicklingError: Can't pickle <type 'NoneType'>
Я решил обойти проблему с NoneType за счёт смены сериализатора на JSON. Это делается достаточно легко: я научил его сериализовать и десериализовать нужные нам типы данных (datetime и тому подобное). Код начал исполняться, вот только задачи пропадали, а воркеры «залипали». Оказалось, что проблема переехала в мастер‑процесс воркера. Всё так же:
Task Handler ERROR: PicklingError("Can't pickle <type 'NoneType'>: attribute lookup __builtin__.NoneType failed",)
Подмена стандартного pickle на dill в Kombu (issue #2526 на GitHub), предложенная maximilianr, не поможет. Библиотека billiard выбирает между pypickle и cPickle, и ей всё равно, что прописано в kombu.serialization.pickle.
from celery import Celery
import dill as dill
from io import BytesIO
import kombu
def add_dill():
registry = kombu.serialization.registry
kombu.serialization.pickle = dill
registry.unregister('pickle')
def pickle_loads(s, load=dill.load):
return load(BytesIO(s))
def pickle_dumps(obj, dumper=dill.dumps):
return dumper(obj, protocol=kombu.serialization.pickle_protocol)
registry.register('pickle', pickle_dumps, pickle_loads,
content_type='application/x-python-serialize',
content_encoding='binary')
add_dill() # Celery игнорирует это
app = Celery('app', broker='mongodb://localhost:27017/celery_dumps')
@app.task
def print_xy(x):
print(type(x))
if __name__ == '__main__':
print_xy.delay(type(None))
Остаётся только метать лучи гнева, переписывать код так, чтобы он был pickle‑friendly, либо грязными хаками менять поведение стандартного pickle.