Skip to content

Commit ca6200d

Browse files
authored
Support deleting timer task results automatically (#253)
* Support deleting timer task results automatically * Add tests for delete_outdated_task_results * Update tests for delete_outdated_task_results * Fix tests for delete_outdated_task_results
1 parent 45a440b commit ca6200d

File tree

9 files changed

+126
-10
lines changed

9 files changed

+126
-10
lines changed

scrapydweb/default_settings.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117
############################## LogParser ######################################
118118
# Whether to backup the stats json files locally after you visit the Stats page of a job
119119
# so that it is still accessible even if the original logfile has been deleted.
120-
# The default is True, set it to False to disable this behaviour.
120+
# The default is True, set it to False to disable this behavior.
121121
BACKUP_STATS_JSON_FILE = True
122122

123123

@@ -127,11 +127,28 @@
127127

128128
# The default is 300, which means ScrapydWeb would automatically create a snapshot of the Jobs page
129129
# and save the jobs info in the database in the background every 300 seconds.
130-
# Note that this behaviour would be paused if the scheduler for timer tasks is disabled.
131-
# Set it to 0 to disable this behaviour.
130+
# Note that this behavior would be paused if the scheduler for timer tasks is disabled.
131+
# Set it to 0 to disable this behavior.
132132
JOBS_SNAPSHOT_INTERVAL = 300
133133

134134

135+
# The default is 300, which means ScrapydWeb would automatically check the amount of task results of all timer tasks
136+
# in the background every 300 seconds to delete some outdated records in the database.
137+
# This option works only when either KEEP_TASK_RESULT_LIMIT or KEEP_TASK_RESULT_WITHIN_DAYS is not 0.
138+
# Note that this behavior would be paused if the scheduler for timer tasks is disabled.
139+
# Set it to 0 to disable this behavior.
140+
CHECK_TASK_RESULT_INTERVAL = 300
141+
142+
# The default is 1000, which means only the latest 1000 timer task results would not be deleted from the database.
143+
# See also CHECK_TASK_RESULT_INTERVAL. Set it to 0 to disable this behavior.
144+
KEEP_TASK_RESULT_LIMIT = 1000
145+
146+
# The default is 31, which means only the timer task results executed within recent 31 days
147+
# would not be deleted from the database.
148+
# See also CHECK_TASK_RESULT_INTERVAL. Set it to 0 to disable this behavior.
149+
KEEP_TASK_RESULT_WITHIN_DAYS = 31
150+
151+
135152
############################## Run Spider #####################################
136153
# The default is False, set it to True to automatically
137154
# expand the 'settings & arguments' section in the Run Spider page.

scrapydweb/run.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from flask import request
99

1010
# from . import create_app # --debug: ImportError: cannot import name 'create_app'
11+
# python -m scrapydweb.run
1112
from scrapydweb import create_app
1213
from scrapydweb.__version__ import __description__, __version__
1314
from scrapydweb.common import authenticate, find_scrapydweb_settings_py, handle_metadata, handle_slash
@@ -137,9 +138,15 @@ def load_custom_settings(config):
137138
file=SCRAPYDWEB_SETTINGS_PY))
138139
else:
139140
sys.exit("\nATTENTION:\nYou may encounter ERROR if there are any running timer tasks added in v1.2.0,\n"
140-
"and you have to restart scrapydweb and manually edit the tasks to resume them.\n"
141-
"\nThe config file '{file}' has been copied to current working directory.\n"
142-
"Please add your SCRAPYD_SERVERS in the config file and restart scrapydweb.\n".format(
141+
"and you have to restart scrapydweb and manually edit the tasks to resume them.\n\n"
142+
"The config file '{file}' has been copied to current working directory.\n"
143+
"Please add your SCRAPYD_SERVERS in the config file and restart scrapydweb.\n\n"
144+
"New options to control the amount of task results of all timer tasks:\n"
145+
"##########\n"
146+
"CHECK_TASK_RESULT_INTERVAL = 300\n"
147+
"KEEP_TASK_RESULT_LIMIT = 1000\n"
148+
"KEEP_TASK_RESULT_WITHIN_DAYS = 31\n"
149+
"##########\n".format(
143150
file=SCRAPYDWEB_SETTINGS_PY))
144151

145152

scrapydweb/templates/scrapydweb/settings.html

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ <h3>Timer tasks</h3>
9595
<ul class="collapse">
9696
<li><div class="title"><h4>scheduler.state: {{ scheduler_state }}</h4></div></li>
9797
<li><div class="title"><h4>JOBS_SNAPSHOT_INTERVAL = {{ JOBS_SNAPSHOT_INTERVAL }}</h4></div></li>
98+
<li><div class="title"><h4>CHECK_TASK_RESULT_INTERVAL = {{ CHECK_TASK_RESULT_INTERVAL }}</h4></div></li>
99+
<li><div class="title"><h4>KEEP_TASK_RESULT_LIMIT = {{ KEEP_TASK_RESULT_LIMIT }}</h4></div></li>
100+
<li><div class="title"><h4>KEEP_TASK_RESULT_WITHIN_DAYS = {{ KEEP_TASK_RESULT_WITHIN_DAYS }}</h4></div></li>
98101
</ul>
99102
</div>
100103

scrapydweb/utils/check_app_config.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,28 @@ def check_assert(key, default, is_instance, allow_zero=True, non_empty=False, co
307307
trigger='interval', seconds=JOBS_SNAPSHOT_INTERVAL,
308308
misfire_grace_time=60, coalesce=True, max_instances=1, jobstore='memory'))
309309

310+
check_assert('CHECK_TASK_RESULT_INTERVAL', 300, int)
311+
check_assert('KEEP_TASK_RESULT_LIMIT', 1000, int)
312+
check_assert('KEEP_TASK_RESULT_WITHIN_DAYS', 31, int)
313+
CHECK_TASK_RESULT_INTERVAL = config.get('CHECK_TASK_RESULT_INTERVAL', 300)
314+
KEEP_TASK_RESULT_LIMIT = config.get('KEEP_TASK_RESULT_LIMIT', 1000)
315+
KEEP_TASK_RESULT_WITHIN_DAYS = config.get('KEEP_TASK_RESULT_WITHIN_DAYS', 31)
316+
317+
logger.info('CHECK_TASK_RESULT_INTERVAL: %s' % CHECK_TASK_RESULT_INTERVAL)
318+
logger.info('KEEP_TASK_RESULT_LIMIT: %s' % KEEP_TASK_RESULT_LIMIT)
319+
logger.info('KEEP_TASK_RESULT_WITHIN_DAYS: %s' % KEEP_TASK_RESULT_WITHIN_DAYS)
320+
if CHECK_TASK_RESULT_INTERVAL and (KEEP_TASK_RESULT_LIMIT or KEEP_TASK_RESULT_WITHIN_DAYS):
321+
username = config.get('USERNAME', '')
322+
password = config.get('PASSWORD', '')
323+
kwargs = dict(
324+
url=config['URL_SCRAPYDWEB'] + handle_metadata().get('url_delete_task_result',
325+
'/1/tasks/xhr/delete/1/2/'),
326+
auth=(username, password) if username and password else None,
327+
)
328+
logger.info(scheduler.add_job(id='delete_task_result', replace_existing=True,
329+
func=delete_task_result, args=None, kwargs=kwargs,
330+
trigger='interval', seconds=CHECK_TASK_RESULT_INTERVAL,
331+
misfire_grace_time=60, coalesce=True, max_instances=1, jobstore='memory'))
310332
# Subprocess
311333
init_subprocess(config)
312334

@@ -323,6 +345,17 @@ def create_jobs_snapshot(url_jobs, auth, nodes):
323345
# print(url_jobs, r.status_code)
324346

325347

348+
def delete_task_result(url, auth):
349+
url = re.sub(r'(\d+/)+$', '', url)
350+
try:
351+
r = session.post(url, auth=auth, timeout=60)
352+
assert r.status_code == 200, "Request got status_code: %s" % r.status_code
353+
except Exception as err:
354+
print("Fail to delete task result: %s\n%s" % (url, err))
355+
# else:
356+
# print('delete_task_result', url, r.status_code, r.json())
357+
358+
326359
def check_scrapyd_servers(config):
327360
SCRAPYD_SERVERS = config.get('SCRAPYD_SERVERS', []) or ['127.0.0.1:6800']
328361
SCRAPYD_SERVERS_PUBLIC_URLS = config.get('SCRAPYD_SERVERS_PUBLIC_URLS', None) or [''] * len(SCRAPYD_SERVERS)
@@ -368,11 +401,12 @@ def check_connectivity(server):
368401
try:
369402
url = 'http://%s:%s' % (_ip, _port)
370403
r = session.get(url, auth=_auth, timeout=10)
371-
assert r.status_code == 200, "%s got status_code %s" % (url, r.status_code)
404+
assert r.status_code == 200, "%s with auth %s got status_code %s" % (url, _auth, r.status_code)
372405
except Exception as err:
373406
logger.error(err)
374407
return False
375408
else:
409+
logger.debug("%s with auth %s got status_code %s" % (url, _auth, r.status_code))
376410
return True
377411

378412
# with ThreadPool(min(len(servers), 100)) as pool: # Works in python 3.3 and up

scrapydweb/vars.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
PYTHON_VERSION = '.'.join([str(n) for n in sys.version_info[:3]])
1717
PY2 = sys.version_info.major < 3
18-
SCRAPYDWEB_SETTINGS_PY = 'scrapydweb_settings_v10.py'
18+
SCRAPYDWEB_SETTINGS_PY = 'scrapydweb_settings_v11.py'
1919
sys.path.append(os.getcwd())
2020
try:
2121
custom_settings_module = importlib.import_module(os.path.splitext(SCRAPYDWEB_SETTINGS_PY)[0])

scrapydweb/views/baseview.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ def __init__(self, *args, **kwargs):
111111
# Timer Tasks
112112
self.scheduler = scheduler
113113
self.JOBS_SNAPSHOT_INTERVAL = app.config.get('JOBS_SNAPSHOT_INTERVAL', 300)
114+
self.CHECK_TASK_RESULT_INTERVAL = app.config.get('CHECK_TASK_RESULT_INTERVAL', 300)
115+
self.KEEP_TASK_RESULT_LIMIT = app.config.get('KEEP_TASK_RESULT_LIMIT', 1000)
116+
self.KEEP_TASK_RESULT_WITHIN_DAYS = app.config.get('KEEP_TASK_RESULT_WITHIN_DAYS', 31)
114117

115118
# Run Spider
116119
self.SCHEDULE_EXPAND_SETTINGS_ARGUMENTS = app.config.get('SCHEDULE_EXPAND_SETTINGS_ARGUMENTS', False)

scrapydweb/views/overview/tasks.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# coding: utf-8
2-
from datetime import datetime
2+
from datetime import datetime, timedelta
33
import json
44
import logging
55
import traceback
66

77
from flask import Blueprint, flash, render_template, request, send_file, url_for
8+
from sqlalchemy import and_
89

910
from ...common import handle_metadata
1011
from ...models import Task, TaskResult, TaskJobResult, db
@@ -272,6 +273,7 @@ def __init__(self):
272273
self.task = Task.query.get(self.task_id) if self.task_id else None
273274
self.apscheduler_job = self.scheduler.get_job(str(self.task_id)) if self.task_id else None # Return type: Job|None
274275
self.js = dict(action=self.action, task_id=self.task_id, task_result_id=self.task_result_id, url=request.url)
276+
# self.logger.warning(self.js)
275277

276278
def dispatch_request(self, **kwargs):
277279
try:
@@ -293,8 +295,10 @@ def generate_response(self):
293295
elif self.action == 'delete': # delete a task_result|task
294296
if self.task_result_id:
295297
self.delete_task_result()
296-
else:
298+
elif self.task_id:
297299
self.delete_task()
300+
else:
301+
self.delete_outdated_task_results()
298302
elif self.action == 'dump': # For test only
299303
self.dump_task_data()
300304
elif self.action == 'fire': # update next_run_time
@@ -425,3 +429,38 @@ def list_tasks_or_results(self):
425429
else:
426430
records = Task.query.all()
427431
self.js['ids'] = [i.id for i in records]
432+
433+
def delete_outdated_task_results(self):
434+
# The condition equals to: pass_count != 0 or fail_count != 0
435+
condition = ~and_(TaskResult.pass_count == 0, TaskResult.fail_count == 0)
436+
437+
if self.KEEP_TASK_RESULT_LIMIT:
438+
count_before = TaskResult.query.count()
439+
task_results = TaskResult.query.filter(condition).order_by(
440+
TaskResult.execute_time.desc()).offset(self.KEEP_TASK_RESULT_LIMIT).all()
441+
for task_result in task_results:
442+
self.logger.debug("delete TaskResult: %s" % task_result)
443+
db.session.delete(task_result)
444+
db.session.commit()
445+
count_after = TaskResult.query.count()
446+
self.logger.info("KEEP_TASK_RESULT_LIMIT: %s, total TaskResult: from %s to %s" % (
447+
self.KEEP_TASK_RESULT_LIMIT, count_before, count_after))
448+
self.js.update(amount_limit=dict(KEEP_TASK_RESULT_LIMIT=self.KEEP_TASK_RESULT_LIMIT,
449+
count_before=count_before, count_after=count_after))
450+
451+
if self.KEEP_TASK_RESULT_WITHIN_DAYS:
452+
count_before = TaskResult.query.count()
453+
# timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)
454+
n_days_ago = datetime.now() - timedelta(days=self.KEEP_TASK_RESULT_WITHIN_DAYS)
455+
456+
task_results = TaskResult.query.filter(TaskResult.execute_time <= n_days_ago, condition).all()
457+
for task_result in task_results:
458+
self.logger.debug("delete TaskResult: %s" % task_result)
459+
db.session.delete(task_result)
460+
db.session.commit()
461+
462+
count_after = TaskResult.query.count()
463+
self.logger.info("KEEP_TASK_RESULT_WITHIN_DAYS: %s, total TaskResult: from %s to %s" % (
464+
self.KEEP_TASK_RESULT_WITHIN_DAYS, count_before, count_after))
465+
self.js.update(day_limit=dict(KEEP_TASK_RESULT_WITHIN_DAYS=self.KEEP_TASK_RESULT_WITHIN_DAYS,
466+
count_before=count_before, count_after=count_after))

scrapydweb/views/system/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ def update_kwargs(self):
9090
# Timer Tasks
9191
self.kwargs['scheduler_state'] = SCHEDULER_STATE_DICT[self.scheduler.state]
9292
self.kwargs['JOBS_SNAPSHOT_INTERVAL'] = self.JOBS_SNAPSHOT_INTERVAL
93+
self.kwargs['CHECK_TASK_RESULT_INTERVAL'] = self.CHECK_TASK_RESULT_INTERVAL
94+
self.kwargs['KEEP_TASK_RESULT_LIMIT'] = self.KEEP_TASK_RESULT_LIMIT
95+
self.kwargs['KEEP_TASK_RESULT_WITHIN_DAYS'] = self.KEEP_TASK_RESULT_WITHIN_DAYS
9396

9497
# Run Spider
9598
self.kwargs['run_spider_details'] = self.json_dumps(dict(

tests/test_tasks_single_scrapyd.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from six.moves.urllib.parse import unquote_plus
99
from tzlocal import get_localzone
1010

11+
from scrapydweb.utils.check_app_config import check_app_config
1112
from tests.utils import cst, req_single_scrapyd, sleep, upload_file_deploy
1213

1314

@@ -867,3 +868,12 @@ def test_history(app, client):
867868
"assert js['status_code'] == 200 and js['status'] == 'ok'",
868869
"Task #%s deleted" % task_id,
869870
])
871+
872+
def test_check_task_result_interval(app, client):
873+
app.config['ENABLE_MONITOR'] = False
874+
app.config['CHECK_TASK_RESULT_INTERVAL'] = 5
875+
app.config['SCRAPYD_SERVERS'] = app.config['_SCRAPYD_SERVERS']
876+
check_app_config(app.config)
877+
sleep(8)
878+
__, js = req_single_scrapyd(app, client, view='tasks.xhr', kws=dict(node=NODE, action='delete'))
879+
print("test_check_task_result_interval: %s" % js)

0 commit comments

Comments
 (0)