Did I find the right examples for you? yes no      Crawl my project      Python Jobs

All Samples(4)  |  Call(4)  |  Derive(0)  |  Import(0)
The backend cleanup task can be used to clean up the default result
backend.

If the configured backend requires periodic cleanup this task is also
automatically configured to run every day at midnight (requires
:program:`celery beat` to be running).

        @connect_on_app_finalize
def add_backend_cleanup_task(app):
    """The backend cleanup task can be used to clean up the default result
    backend.

    If the configured backend requires periodic cleanup this task is also
    automatically configured to run every day at midnight (requires
    :program:`celery beat` to be running).

    """
    @app.task(name='celery.backend_cleanup',
              shared=False, _force_evaluate=True)
    def backend_cleanup():
        app.backend.cleanup()
    return backend_cleanup
        


src/f/i/firefox-flicks-HEAD/vendor-local/lib/python/celery/tests/app/test_builtins.py   firefox-flicks(Download)
        app.backend.cleanup.__name__ = 'cleanup'
        try:
            cleanup_task = builtins.add_backend_cleanup_task(app)
            cleanup_task()
            self.assertTrue(app.backend.cleanup.called)

src/c/e/celery-HEAD/celery/tests/app/test_builtins.py   celery(Download)
    def test_run(self):
        self.app.backend.cleanup = Mock()
        self.app.backend.cleanup.__name__ = 'cleanup'
        cleanup_task = builtins.add_backend_cleanup_task(self.app)
        cleanup_task()

src/a/n/antisocial-HEAD/ve/lib/python2.7/site-packages/celery/tests/app/test_builtins.py   antisocial(Download)
        app.backend.cleanup.__name__ = 'cleanup'
        try:
            cleanup_task = builtins.add_backend_cleanup_task(app)
            cleanup_task()
            self.assertTrue(app.backend.cleanup.called)

src/c/e/celery-3.1.11/celery/tests/app/test_builtins.py   celery(Download)
    def test_run(self):
        self.app.backend.cleanup = Mock()
        self.app.backend.cleanup.__name__ = 'cleanup'
        cleanup_task = builtins.add_backend_cleanup_task(self.app)
        cleanup_task()