Commit 6114d0f9 authored by Rémi Duraffort's avatar Rémi Duraffort Committed by Neil Williams
Browse files

Rewrite from scratch the job scheduler

This patch is made of two parts:
1/ changing the Device and TestJob state machine
2/ rewrite of the TestJob scheduler

Tests for the scheduler and the state machine are still missing.

Change-Id: I80f55c0ba89f978875ab079d2bf595a3c963017b
parent 320cb918
......@@ -1197,7 +1197,7 @@ class QueryCondition(models.Model):
# Allowed fields for condition entities.
FIELD_CHOICES = {
TestJob: [
"submitter", "start_time", "end_time", "status", "actual_device",
"submitter", "start_time", "end_time", "state", "health", "actual_device",
"requested_device_type", "health_check", "user", "group",
"priority", "is_pipeline", "description"],
TestSuite: ["name"],
......
......@@ -7,8 +7,8 @@
<h2>LAVA Results for {{ job_link }}
{% if not failed_definitions %}
<span
class="label {% if job.status == job.COMPLETE %}label-success{% else %}label-warning{% endif %}"
{% if job.status == job.COMPLETE %}title="All submitted definitions have started execution."{% endif %}>{{ job.get_status_display }}
class="label {% if job.health == job.HEALTH_COMPLETE %}label-success{% else %}label-warning{% endif %}"
{% if job.health == job.HEALTH_COMPLETE %}title="All submitted definitions have started execution."{% endif %}>{{ job.get_state_display }}
</span>
{% endif %}
</h2>
......
......@@ -129,7 +129,7 @@ def testjob(request, job):
# some duplicates can exist, so get would fail here and [0] is quicker than try except.
testdata = TestData.objects.filter(
testjob=job).prefetch_related('actionlevels__testcase', 'actionlevels__testcase__suite')[0]
if job.status in [TestJob.INCOMPLETE, TestJob.COMPLETE]:
if job.state == TestJob.STATE_FINISHED:
# returns something like ['singlenode-advanced', 'smoke-tests-basic', 'smoke-tests-basic']
executed = [
{
......
......@@ -730,9 +730,8 @@ def _export_query(query_results, content_type, filename):
removed_fields = [
# TestJob fields:
"_results_link", "user_id", "actual_device_id", "definition",
"group_id", "id", "original_definition", "requested_device_id",
"sub_id", "submit_token", "submit_token_id", "submitter_id",
"testdata", "testsuite",
"group_id", "id", "original_definition",
"sub_id", "submitter_id", "testdata", "testsuite",
# TestSuite fields:
"job_id",
# TestCase fields:
......
......@@ -2,8 +2,10 @@ from django import forms
from django.core.exceptions import ValidationError
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from django.db.models import Q
from lava_scheduler_app.models import (
Device, DeviceStateTransition, DeviceType, TestJob, Tag, JobFailureTag,
Device, DeviceType, TestJob, Tag, JobFailureTag,
User, Worker, DefaultDeviceOwner,
Architecture, ProcessorFamily, Alias, BitWidth, Core
)
......@@ -51,48 +53,6 @@ admin.site.unregister(User)
admin.site.register(User, UserAdmin)
def offline_action(modeladmin, request, queryset): # pylint: disable=unused-argument
for device in queryset.filter(status__in=[Device.IDLE, Device.RUNNING, Device.RESERVED]):
if device.can_admin(request.user):
device.put_into_maintenance_mode(request.user, "admin action")
offline_action.short_description = "take offline"
def online_action(modeladmin, request, queryset): # pylint: disable=unused-argument
for device in queryset.filter(status__in=[Device.OFFLINE, Device.OFFLINING]):
if device.can_admin(request.user):
device.put_into_online_mode(request.user, "admin action")
online_action.short_description = "take online"
def online_action_without_health_check(modeladmin, request, queryset): # pylint: disable=unused-argument,invalid-name
for device in queryset.filter(status__in=[Device.OFFLINE, Device.OFFLINING]):
if device.can_admin(request.user):
device.put_into_online_mode(request.user, "admin action", True)
online_action_without_health_check.short_description = \
"take online without manual health check"
def retire_action(modeladmin, request, queryset): # pylint: disable=unused-argument
for device in queryset:
if device.can_admin(request.user):
new_status = device.RETIRED
DeviceStateTransition.objects.create(
created_by=request.user, device=device, old_state=device.status,
new_state=new_status, message="retiring", job=None).save()
device.status = new_status
device.save()
retire_action.short_description = "retire"
def cancel_action(modeladmin, request, queryset): # pylint: disable=unused-argument
for testjob in queryset:
if testjob.can_cancel(request.user):
......@@ -102,18 +62,9 @@ def cancel_action(modeladmin, request, queryset): # pylint: disable=unused-argu
cancel_action.short_description = 'cancel selected jobs'
def health_unknown(modeladmin, request, queryset): # pylint: disable=unused-argument
for device in queryset.filter(health_status=Device.HEALTH_PASS):
device.health_status = Device.HEALTH_UNKNOWN
device.save()
health_unknown.short_description = "set health_status to unknown"
class ActiveDevicesFilter(admin.SimpleListFilter):
title = 'Active devices'
parameter_name = 'status'
parameter_name = 'state'
def lookups(self, request, model_admin):
return (
......@@ -123,28 +74,9 @@ class ActiveDevicesFilter(admin.SimpleListFilter):
def queryset(self, request, queryset):
if self.value() == 'NoRetired':
return queryset.exclude(status=Device.RETIRED).order_by('hostname')
return queryset.exclude(health=Device.HEALTH_RETIRED).order_by('hostname')
if self.value() == 'CurrentJob':
return queryset.filter(current_job__isnull=False).order_by('hostname')
class RequestedDeviceFilter(admin.SimpleListFilter):
title = 'Requested Device (except retired)'
parameter_name = 'requested_device'
def lookups(self, request, model_admin):
list_of_types = []
queryset = Device.objects.exclude(status=Device.RETIRED).order_by('hostname')
for dev_type in queryset:
list_of_types.append(
(str(dev_type.hostname), dev_type.hostname)
)
return sorted(list_of_types, key=lambda tp: tp[1])
def queryset(self, request, queryset):
if self.value():
return queryset.filter(requested_device__hostname=self.value())
return queryset.order_by('requested_device__hostname')
return queryset.filter(state__in=[Device.STATE_RESERVED, Device.STATE_RUNNING]).order_by('hostname')
class ActualDeviceFilter(admin.SimpleListFilter):
......@@ -153,7 +85,7 @@ class ActualDeviceFilter(admin.SimpleListFilter):
def lookups(self, request, model_admin):
list_of_types = []
queryset = Device.objects.exclude(status=Device.RETIRED).order_by('hostname')
queryset = Device.objects.exclude(health=Device.HEALTH_RETIRED).order_by('hostname')
for dev_type in queryset:
list_of_types.append(
(str(dev_type.hostname), dev_type.hostname)
......@@ -205,11 +137,9 @@ class RequestedDeviceTypeFilter(admin.SimpleListFilter):
class DeviceAdmin(admin.ModelAdmin):
actions = [online_action, online_action_without_health_check,
offline_action, health_unknown, retire_action]
list_filter = (DeviceTypeFilter, 'status', ActiveDevicesFilter,
'health_status', 'worker_host')
raw_id_fields = ['current_job', 'last_health_report_job']
list_filter = (DeviceTypeFilter, 'state', ActiveDevicesFilter,
'health', 'worker_host')
raw_id_fields = ['last_health_report_job']
def has_health_check(self, obj):
return bool(obj.get_health_check())
......@@ -240,15 +170,15 @@ class DeviceAdmin(admin.ModelAdmin):
('Device owner', {
'fields': (('user', 'group'), ('physical_owner', 'physical_group'), 'is_public', 'is_pipeline')}),
('Status', {
'fields': (('status', 'health_status'), ('last_health_report_job', 'current_job'))}),
'fields': (('state', 'health'), ('last_health_report_job', 'current_job'))}),
('Advanced properties', {
'fields': ('description', 'tags', ('device_dictionary_jinja')),
'classes': ('collapse', )
}),
)
readonly_fields = ('device_dictionary_jinja', )
readonly_fields = ('device_dictionary_jinja', 'state', 'current_job')
list_display = ('hostname', 'device_type', 'current_job', 'worker_host',
'status', 'health_status', 'has_health_check',
'state', 'health', 'has_health_check',
'health_check_enabled', 'is_public', 'is_pipeline',
'valid_device', 'exclusive_device')
search_fields = ('hostname', 'device_type__name')
......@@ -270,48 +200,30 @@ class VisibilityForm(forms.ModelForm):
class TestJobAdmin(admin.ModelAdmin):
def requested_device_hostname(self, obj):
return '' if obj.requested_device is None else obj.requested_device.hostname
requested_device_hostname.short_description = 'Requested device'
def requested_device_type_name(self, obj):
return '' if obj.requested_device_type is None else obj.requested_device_type
requested_device_type_name.short_description = 'Request device type'
form = VisibilityForm
actions = [cancel_action]
list_filter = ('status', RequestedDeviceTypeFilter, RequestedDeviceFilter, ActualDeviceFilter)
list_filter = ('state', RequestedDeviceTypeFilter, ActualDeviceFilter)
fieldsets = (
('Owner', {
'fields': ('user', 'group', 'submitter', 'submit_token', 'is_public', 'visibility', 'viewing_groups')}),
'fields': ('user', 'group', 'submitter', 'is_public', 'visibility', 'viewing_groups')}),
('Request', {
'fields': ('requested_device', 'requested_device_type', 'priority', 'health_check')}),
'fields': ('requested_device_type', 'priority', 'health_check')}),
('Advanced properties', {
'fields': ('description', 'tags', 'sub_id', 'target_group')}),
('Current status', {
'fields': ('actual_device', 'status')}),
'fields': ('actual_device', 'state', 'health')}),
('Results & Failures', {
'fields': ('failure_tags', 'failure_comment', '_results_link')}),
)
list_display = ('id', 'status', 'submitter', 'requested_device_type_name', 'requested_device_hostname',
readonly_fields = ('state', )
list_display = ('id', 'state', 'health', 'submitter', 'requested_device_type_name',
'actual_device', 'health_check', 'submit_time', 'start_time', 'end_time')
ordering = ['-submit_time']
class DeviceStateTransitionAdmin(admin.ModelAdmin):
def device_hostname(self, obj):
return obj.device.hostname
raw_id_fields = ['job']
list_filter = ('device__hostname', )
list_display = ('device_hostname', 'old_state', 'new_state', 'created_on')
fieldsets = (
('State', {
'fields': ('device', 'old_state', 'new_state')}),
('Metadata', {
'fields': ('created_by', 'job', 'message')})
)
def disable_health_check_action(modeladmin, request, queryset): # pylint: disable=unused-argument
queryset.update(disable_health_check=False)
......@@ -393,7 +305,6 @@ class TagAdmin(admin.ModelAdmin):
admin.site.register(Device, DeviceAdmin)
admin.site.register(DeviceStateTransition, DeviceStateTransitionAdmin)
admin.site.register(DeviceType, DeviceTypeAdmin)
admin.site.register(TestJob, TestJobAdmin)
admin.site.register(Tag, TagAdmin)
......
......@@ -6,11 +6,11 @@ import sys
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.db.models import Count, Q
from django.db import transaction
from linaro_django_xmlrpc.models import ExposedAPI
from lava_scheduler_app.models import (
Device,
DeviceType,
DeviceStateTransition,
JSONDataError,
DevicesUnavailableException,
TestJob,
......@@ -56,6 +56,36 @@ def check_superuser(f):
return wrapper
def build_job_status_display(state, health):
if state in [TestJob.STATE_SUBMITTED, TestJob.STATE_SCHEDULING, TestJob.STATE_SCHEDULED]:
return "Submitted"
elif state == TestJob.STATE_RUNNING:
return "Running"
elif state == TestJob.STATE_CANCELING:
return "Canceling"
else:
if health == TestJob.HEALTH_COMPLETE:
return "Complete"
elif health in [TestJob.HEALTH_UNKNOWN, TestJob.HEALTH_INCOMPLETE]:
return "Incomplete"
else:
return "Canceled"
def build_device_status_display(state, health):
if state == Device.STATE_IDLE:
if health in [Device.HEALTH_GOOD, Device.HEALTH_UNKNOWN]:
return "Idle"
elif health == Device.HEALTH_RETIRED:
return "Retired"
else:
return "Offline"
elif state == Device.STATE_RESERVED:
return "Reserved"
else:
return "Running"
class SchedulerAPI(ExposedAPI):
def submit_job(self, job_data):
......@@ -163,26 +193,31 @@ class SchedulerAPI(ExposedAPI):
if not job_id:
raise xmlrpclib.Fault(400, "Bad request: TestJob id was not "
"specified.")
try:
job = get_restricted_job(self.user, job_id)
except PermissionDenied:
raise xmlrpclib.Fault(
401, "Permission denied for user to job %s" % job_id)
except TestJob.DoesNotExist:
raise xmlrpclib.Fault(404, "Specified job not found.")
if job.status > TestJob.RUNNING:
# Don't do anything for jobs that ended already
return True
if not job.can_cancel(self.user):
raise xmlrpclib.Fault(403, "Permission denied.")
if job.is_multinode:
multinode_jobs = TestJob.objects.filter(
target_group=job.target_group)
for multinode_job in multinode_jobs:
multinode_job.cancel(self.user)
else:
job.cancel(self.user)
with transaction.atomic():
try:
job = get_restricted_job(self.user, job_id, for_update=True)
except PermissionDenied:
raise xmlrpclib.Fault(
401, "Permission denied for user to job %s" % job_id)
except TestJob.DoesNotExist:
raise xmlrpclib.Fault(404, "Specified job not found.")
if job.state in [TestJob.STATE_CANCELING, TestJob.STATE_FINISHED]:
# Don't do anything for jobs that ended already
return True
if not job.can_cancel(self.user):
raise xmlrpclib.Fault(403, "Permission denied.")
if job.is_multinode:
multinode_jobs = TestJob.objects.select_for_update().filter(
target_group=job.target_group)
for multinode_job in multinode_jobs:
multinode_job.go_state_canceling()
multinode_job.save()
else:
job.go_state_canceling()
job.save()
return True
def validate_yaml(self, yaml_string):
......@@ -282,18 +317,16 @@ class SchedulerAPI(ExposedAPI):
device hostname, device type, device state, current running job id and
if device is pipeline. For example:
[['panda01', 'panda', 'running', 164, False], ['qemu01', 'qemu', 'idle', None, True]]
[['panda01', 'panda', 'running', 'good', 164, False], ['qemu01', 'qemu', 'idle', 'unknwon', None, True]]
"""
devices_list = []
for dev in Device.objects.all():
for dev in Device.objects.exclude(health=Device.HEALTH_RETIRED):
if not dev.is_visible_to(self.user):
continue
if dev.status == Device.RETIRED:
continue
devices_list.append(dev)
return [list((dev.hostname, dev.device_type.name, Device.STATUS_CHOICES[dev.status][1].lower(), dev.current_job.pk if dev.current_job else None, dev.is_pipeline))
return [[dev.hostname, dev.device_type.name, build_device_status_display(dev.state, dev.health), dev.current_job().pk if dev.current_job() else None, dev.is_pipeline]
for dev in devices_list]
def all_device_types(self):
......@@ -404,10 +437,9 @@ class SchedulerAPI(ExposedAPI):
403, "DeviceType '%s' not available to user '%s'." %
(device_type, self.user)
)
job_qs = TestJob.objects.filter(
status__in=(TestJob.COMPLETE, TestJob.INCOMPLETE),
requested_device_type=dt)\
.order_by('-id')
job_qs = TestJob.objects.filter(state=TestJob.STATE_FINISHED) \
.filter(requested_device_type=dt) \
.order_by('-id')
if restrict_to_user:
job_qs = job_qs.filter(submitter=self.user)
job_list = []
......@@ -415,7 +447,7 @@ class SchedulerAPI(ExposedAPI):
job_dict = {
"id": job.id,
"description": job.description,
"status": job.get_status_display(),
"status": build_job_status_display(job.state, job.health),
"device": job.actual_device.hostname,
}
if not job.can_view(self.user):
......@@ -485,10 +517,9 @@ class SchedulerAPI(ExposedAPI):
403, "Device '%s' not available to user '%s'." %
(device, self.user)
)
job_qs = TestJob.objects.filter(
status__in=(TestJob.COMPLETE, TestJob.INCOMPLETE),
actual_device=device_obj)\
.order_by('-id')
job_qs = TestJob.objects.filter(state=TestJob.STATE_FINISHED) \
.filter(actual_device=device_obj) \
.order_by('-id')
if restrict_to_user:
job_qs = job_qs.filter(submitter=self.user)
job_list = []
......@@ -496,7 +527,7 @@ class SchedulerAPI(ExposedAPI):
job_dict = {
"id": job.id,
"description": job.description,
"status": job.get_status_display(),
"status": build_job_status_display(job.state, job.health),
}
if not job.can_view(self.user):
job_dict["id"] = None
......@@ -586,26 +617,15 @@ class SchedulerAPI(ExposedAPI):
device_dict = {}
if device.is_visible_to(self.user):
device_dict["hostname"] = device.hostname
device_dict["status"] = Device.STATUS_CHOICES[device.status][1].lower()
device_dict["status"] = build_device_status_display(device.state, device.health)
device_dict["job"] = None
device_dict["offline_since"] = None
device_dict["offline_by"] = None
device_dict["is_pipeline"] = device.is_pipeline
if device.current_job:
device_dict["job"] = device.current_job.pk
if device.status == Device.OFFLINE:
device_dict["offline_since"] = ""
device_dict["offline_by"] = ""
try:
last_transition = device.transitions.latest('created_on')
if last_transition.new_state == Device.OFFLINE:
device_dict["offline_since"] = str(last_transition.created_on)
if last_transition.created_by:
device_dict["offline_by"] = last_transition.created_by.username
except (Device.DoesNotExist, DeviceStateTransition.DoesNotExist):
pass
current_job = device.current_job()
if current_job is not None:
device_dict["job"] = current_job.pk
else:
raise xmlrpclib.Fault(
403, "Permission denied for user to access %s information." % hostname
......@@ -648,18 +668,20 @@ class SchedulerAPI(ExposedAPI):
raise xmlrpclib.Fault(
400, "Bad request: Reason was not specified."
)
try:
device = Device.objects.get(hostname=hostname)
except Device.DoesNotExist:
raise xmlrpclib.Fault(
404, "Device '%s' was not found." % hostname
)
if device.can_admin(self.user):
device.put_into_maintenance_mode(self.user, reason, notify)
else:
raise xmlrpclib.Fault(
403, "Permission denied for user to put %s into maintenance mode." % hostname
)
with transaction.atomic():
try:
device = Device.objects.select_for_update().get(hostname=hostname)
except Device.DoesNotExist:
raise xmlrpclib.Fault(
404, "Device '%s' was not found." % hostname
)
if device.can_admin(self.user):
device.health = Device.HEALTH_MAINTENANCE
device.save()
else:
raise xmlrpclib.Fault(
403, "Permission denied for user to put %s into maintenance mode." % hostname
)
def put_into_online_mode(self, hostname, reason, skip_health_check=False):
"""
......@@ -697,18 +719,20 @@ class SchedulerAPI(ExposedAPI):
raise xmlrpclib.Fault(
400, "Bad request: Reason was not specified."
)
try:
device = Device.objects.get(hostname=hostname)
except Device.DoesNotExist:
raise xmlrpclib.Fault(
404, "Device '%s' was not found." % hostname
)
if device.can_admin(self.user):
device.put_into_online_mode(self.user, reason, skip_health_check)
else:
raise xmlrpclib.Fault(
403, "Permission denied for user to put %s into online mode." % hostname
)
with transaction.atomic():
try:
device = Device.objects.select_for_update().get(hostname=hostname)
except Device.DoesNotExist:
raise xmlrpclib.Fault(
404, "Device '%s' was not found." % hostname
)
if device.can_admin(self.user):
device.health = Device.HEALTH_UNKNOWN
device.save()
else:
raise xmlrpclib.Fault(
403, "Permission denied for user to put %s into online mode." % hostname
)
def pending_jobs_by_device_type(self):
"""
......@@ -735,9 +759,9 @@ class SchedulerAPI(ExposedAPI):
pending_jobs_by_device = {}
jobs_res = TestJob.objects.filter(status=TestJob.SUBMITTED)\
.values_list('requested_device_type_id')\
.annotate(pending_jobs=(Count('id')))
jobs_res = TestJob.objects.filter(state=TestJob.STATE_SUBMITTED) \
.values_list('requested_device_type_id')\
.annotate(pending_jobs=(Count('id')))
jobs = {}
jobs_hash = dict(jobs_res)
for job in jobs_hash:
......@@ -774,14 +798,13 @@ class SchedulerAPI(ExposedAPI):
the user is authenticated with an username and token.
The elements available in XML-RPC structure include:
_results_link, _state, submitter_id, submit_token_id, is_pipeline,
id, failure_comment, multinode_definition, user_id,
priority, _actual_device_cache, original_definition,
status, health_check, description, admin_notifications, start_time,
target_group, visibility, requested_device_id, pipeline_compatibility,
submit_time, is_public, _old_status, actual_device_id, definition,
sub_id, requested_device_type_id, end_time,
group_id, absolute_url, submitter_username
_results_link, _state, submitter_id, is_pipeline, id, failure_comment,
multinode_definition, user_id, priority, _actual_device_cache,
original_definition, status, health_check, description,
admin_notifications, start_time, target_group, visibility,
pipeline_compatibility, submit_time, is_public, _old_status,
actual_device_id, definition, sub_id, requested_device_type_id,
end_time, group_id, absolute_url, submitter_username
"""
self._authenticate()
if not job_id:
......@@ -789,7 +812,7 @@ class SchedulerAPI(ExposedAPI):
"specified.")
try:
job = get_restricted_job(self.user, job_id)
job.status = job.get_status_display()
job.status = build_job_status_display(job.state, job.health)
job.submitter_username = job.submitter.username
job.absolute_url = job.get_absolute_url()
except PermissionDenied:
......@@ -849,7 +872,7 @@ class SchedulerAPI(ExposedAPI):
if job.is_pipeline:
job_status.update({
'job_status': job.get_status_display(),
'job_status': build_job_status_display(job.state, job.health),
'bundle_sha1': ""
})
return job_status
......@@ -863,7 +886,7 @@ class SchedulerAPI(ExposedAPI):
pass
job_status.update({
'job_status': job.get_status_display(),
'job_status': build_job_status_display(job.state, job.health),
'bundle_sha1': bundle_sha1
})
......@@ -911,7 +934,7 @@ class SchedulerAPI(ExposedAPI):
raise xmlrpclib.Fault(400, "Bad request: needs to be a list of integers or floats")
jobs = TestJob.objects.filter(
Q(id__in=job_id_list) | Q(sub_id__in=job_id_list)).select_related(
'actual_device', 'requested_device', 'requested_device_type')
'actual_device', 'requested_device_type')
for job in jobs:
device_type = job.job_device_type()
if not job.can_view(self.user) or not job.is_accessible_by(self.user) and not self.user.is_superuser:
......@@ -920,42 +943,9 @@ class SchedulerAPI(ExposedAPI):