Commit 180cb3c1 authored by Antonio Terceiro's avatar Antonio Terceiro
Browse files

lava_dispatcher_host: add support for docker device sharing under cgroups v2

Under cgroups v2, device access control is done with BPF programs only.
When docker creates a container, it already attaches a BPF program to
that container cgroup. lava-dispather-host replaces that BPF program
with one of its own, that allows the regular list of devices containers
can usually access (/dev/null, /dev/zero etc), plus all the devices
shared with the container. Subsequent device sharing with the same
container overrides that BPF program with a new one.

cgroups v2 is the default on Debian 11 (bullseye), so in there we need
python3-bpfcc >= 0.21. In Debian 10 (buster, base-files << 1), we don't
need python3-bpfcc, bpftool and the kernel headers, since the
corresponding code path will not be used anyway.

Fixes: https://git.lavasoftware.org/lava/lava/-/issues/467
parent 384b70d2
......@@ -76,7 +76,11 @@ Description: Linaro Automated Validation Architecture dispatcher
Package: lava-dispatcher-host
Architecture: all
Depends: lava-common (= ${binary:Version}),
base-files (<< 11) | python3-bpfcc (>= 0.21),
base-files (<< 11) | linux-headers-amd64 | linux-headers-arm64,
base-files (<< 11) | bpftool,
python3,
python3-jinja2,
python3-requests,
python3-pyudev (>= 0.21),
${misc:Depends},
......
......@@ -19,11 +19,13 @@ import logging.handlers
import os
import stat
import subprocess
from pathlib import Path
import pyudev
from lava_common.compat import yaml_dump, yaml_load
from lava_common.constants import DISPATCHER_DOWNLOAD_DIR as JOBS_DIR
from lava_common.exceptions import InfrastructureError
from lava_dispatcher_host.docker_devices import Device, DeviceFilter
context = pyudev.Context()
......@@ -70,7 +72,7 @@ def validate_device_info(device_info):
def share_device_with_container(options):
data = find_mapping(options)
data, job_id = find_mapping(options)
if not data:
return
container = data["container"]
......@@ -83,9 +85,9 @@ def share_device_with_container(options):
container_type = data["container_type"]
if container_type == "lxc":
share_device_with_container_lxc(container, device)
share_device_with_container_lxc(container, device, job_id=job_id)
elif container_type == "docker":
share_device_with_container_docker(container, device)
share_device_with_container_docker(container, device, job_id=job_id)
else:
raise InfrastructureError('Unsupported container type: "%s"' % container_type)
......@@ -95,8 +97,9 @@ def find_mapping(options):
data = load_mapping_data(mapping)
for item in data:
if match_mapping(item["device_info"], options):
return item
return None
job_id = str(Path(mapping).parent.name)
return item, job_id
return None, None
def load_mapping_data(filename):
......@@ -127,15 +130,17 @@ def log_sharing_device(device, container_type, container):
logger.info(f"Sharing {device} with {container_type} container {container}")
def share_device_with_container_lxc(container, node):
def share_device_with_container_lxc(container, node, job_id):
device = pyudev.Devices.from_device_file(context, node)
pass_device_into_container_lxc(container, node, device.device_links)
pass_device_into_container_lxc(container, node, device.device_links, job_id)
for child in device.children:
if child.device_node:
pass_device_into_container_lxc(child.device_node, child.device_links)
pass_device_into_container_lxc(
child.device_node, child.device_links, job_id
)
def pass_device_into_container_lxc(container, node, links=[]):
def pass_device_into_container_lxc(container, node, links=[], job_id=None):
try:
nodeinfo = os.stat(node)
uid = nodeinfo.st_uid
......@@ -159,24 +164,21 @@ def pass_device_into_container_lxc(container, node, links=[]):
)
def pass_device_into_container_docker(container, container_id, node, links=[]):
def pass_device_into_container_docker(
container, container_id, node, links=[], job_id=None
):
try:
nodeinfo = os.stat(node)
major = os.major(nodeinfo.st_rdev)
minor = os.minor(nodeinfo.st_rdev)
nodetype = "b" if stat.S_ISBLK(nodeinfo.st_mode) else "c"
devices_allow_file = (
"/sys/fs/cgroup/devices/docker/%s/devices.allow" % container_id
)
if not os.path.exists(devices_allow_file):
devices_allow_file = (
"/sys/fs/cgroup/devices/system.slice/docker-%s.scope/devices.allow"
% container_id
)
state = Path(JOBS_DIR) / job_id / (container_id + ".devices")
device_filter = DeviceFilter(container_id, state)
device_filter.add(Device(major, minor))
device_filter.apply()
device_filter.save(state)
with open(devices_allow_file, "w") as allow:
allow.write("a %d:%d rwm\n" % (major, minor))
except FileNotFoundError as exc:
logger.warning(
f"Cannot share {node} with docker container {container}: {exc.filename} not found"
......@@ -212,7 +214,7 @@ def pass_device_into_container_docker(container, container_id, node, links=[]):
)
def share_device_with_container_docker(container, node):
def share_device_with_container_docker(container, node, job_id=None):
log_sharing_device(node, "docker", container)
try:
container_id = subprocess.check_output(
......@@ -226,11 +228,11 @@ def share_device_with_container_docker(container, node):
device = pyudev.Devices.from_device_file(context, node)
pass_device_into_container_docker(
container, container_id, node, device.device_links
container, container_id, node, device.device_links, job_id
)
for child in device.children:
if child.device_node:
pass_device_into_container_docker(
container, container_id, child.device_node, child.device_links
container, container_id, child.device_node, child.device_links, job_id
)
# Copyright (C) 2021 Linaro Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import subprocess
try:
from bcc import BPF
from bcc import BPFAttachType
except ImportError:
# This can happen on Debian 10 and that's ok. The code path that uses this
# will only be used on Debian 11 +
pass
from dataclasses import dataclass
from jinja2 import Template
from pathlib import Path
from typing import Optional
from lava_common.exceptions import InfrastructureError
# XXX bcc.BPF should provide these (from include/uapi/linux/bpf.h in the kernel
# tree)
BPF_F_ALLOW_OVERRIDE = 1 << 0
BPF_F_ALLOW_MULTI = 1 << 1
BPF_F_REPLACE = 1 << 2
TEMPLATE = """
int lava_docker_device_access_control(struct bpf_cgroup_dev_ctx *ctx) {
bpf_trace_printk("Device access: major = %d, minor = %d", ctx->major, ctx->minor);
{% for device in devices %}
{% if device.minor is none %}
if (ctx->major == {{ device.major}}) {
return 1;
}
{% else %}
if (ctx->major == {{ device.major}} && ctx->minor == {{ device.minor }}) {
return 1;
}
{% endif %}
{% endfor %}
return 0;
}
"""
@dataclass(frozen=True)
class Device:
major: str
minor: str = None
def DeviceFilter(*args, **kwargs):
for klass in [DeviceFilterCGroupsV1, DeviceFilterCGroupsV2]:
if klass.detect():
return klass(*args, **kwargs)
raise InfrastructureError(
"Neither cgroups v1 nor v2 detected; can't share device with docker container"
)
class DeviceFilterCommon:
def __init__(self, container, state_file: Optional[Path] = None):
self.__devices__ = set()
if state_file:
self.load(state_file)
self.container_id = subprocess.check_output(
["docker", "inspect", "--format={{.Id}}", container], text=True
).strip()
def load(self, state: Path):
pass
def save(self, state: Path):
pass
def add(self, device: Device):
self.__devices__.add(device)
def apply(self):
pass
@classmethod
def detect(cls):
return False
class DeviceFilterCGroupsV1(DeviceFilterCommon):
@classmethod
def detect(cls):
dirs = [
"/sys/fs/cgroup/devices/docker",
"/sys/fs/cgroup/devices/system.slice",
]
for d in dirs:
if os.path.exists(d):
return True
return False
def apply(self):
devices_allow_file = (
f"/sys/fs/cgroup/devices/docker/{self.container_id}/devices.allow"
)
if not os.path.exists(devices_allow_file):
devices_allow_file = "/sys/fs/cgroup/devices/system.slice/docker-{self.container_id}.scope/devices.allow"
with open(devices_allow_file, "w") as allow:
for device in self.devices:
allow.write("a %d:%d rwm\n" % (device.major, device.minor))
class DeviceFilterCGroupsV2(DeviceFilterCommon):
@classmethod
def detect(cls):
return os.path.exists("/sys/fs/cgroup/system.slice")
DEFAULT_DEVICES = [
Device(1, 3), # /dev/null
Device(1, 5), # /dev/zero
Device(1, 7), # /dev/full
Device(1, 8), # /dev/random
Device(1, 9), # /dev/urandom
Device(5, 0), # /dev/tty
Device(5, 1), # /dev/console
Device(5, 2), # /dev/pts/ptmx
Device(10, 200), # /dev/net/tun
Device(136), # /dev/pts/[0-9]*
]
def __init__(self, container: str, state_file: Optional[Path] = None):
super().__init__(container, state_file)
self.__cgroup__ = (
f"/sys/fs/cgroup/system.slice/docker-{self.container_id}.scope"
)
@property
def devices(self):
return self.DEFAULT_DEVICES + list(self.__devices__)
def load(self, state_file: Path):
if not state_file.exists():
return
with state_file.open() as f:
for line in f.readlines():
major, minor = line.split()
self.add(Device(int(major), int(minor)))
def save(self, state_file):
with state_file.open("w") as f:
for device in self.__devices__:
f.write(f"{device.major} {device.minor}\n")
def apply(self):
existing = self.__get_existing_functions__()
fd = os.open(self.__cgroup__, os.O_RDONLY)
program = bytes(self.expand_template(), "utf-8")
bpf = BPF(text=program)
func = bpf.load_func("lava_docker_device_access_control", bpf.CGROUP_DEVICE)
bpf.attach_func(func, fd, BPFAttachType.CGROUP_DEVICE, BPF_F_ALLOW_MULTI)
os.close(fd)
for fid in existing:
subprocess.check_call(
[
"/usr/sbin/bpftool",
"cgroup",
"detach",
self.__cgroup__,
"device",
"id",
str(fid),
]
)
def __get_existing_functions__(self):
cmd = ["/usr/sbin/bpftool", "cgroup", "list", self.__cgroup__]
data = subprocess.run(cmd, text=True, stdout=subprocess.PIPE).stdout
result = []
for line in data.splitlines():
parts = line.split()
if len(parts) >= 2 and parts[1] == "device":
result.append(int(parts[0]))
return result
def expand_template(self):
template = Template(TEMPLATE)
return template.render(devices=self.devices)
import argparse
import sys
from . import Device, DeviceFilter
parser = argparse.ArgumentParser()
parser.add_argument("container")
parser.add_argument("major")
parser.add_argument("minor")
args = parser.parse_args(sys.argv[1:])
container, major, minor = args.container, args.major, args.minor
device_filter = DeviceFilter(container)
device_filter.add(Device(int(major), int(minor)))
device_filter.apply()
# Copyright (C) 2021 Linaro Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from lava_dispatcher_host.docker_devices import Device, DeviceFilter
import pytest
try:
import bcc
has_bcc = True
except ImportError:
has_bcc = False
@pytest.fixture(autouse=True)
def check_output(mocker):
return mocker.patch("subprocess.check_output")
@pytest.fixture(autouse=True)
def run(mocker):
return mocker.patch("subprocess.run")
@pytest.fixture(autouse=True)
def check_call(mocker):
return mocker.patch("subprocess.check_call")
@pytest.fixture
def fd():
return 17
@pytest.fixture
def os_close(mocker):
return mocker.patch("os.close")
@pytest.fixture(autouse=True)
def os_open(mocker, fd):
return mocker.patch("os.open", return_value=fd)
class TestDeviceFilterCGroupsV2:
@pytest.fixture(autouse=True)
def cgroupsv2(self, mocker):
mocker.patch(
"lava_dispatcher_host.docker_devices.DeviceFilterCGroupsV1.detect",
return_value=False,
)
mocker.patch(
"lava_dispatcher_host.docker_devices.DeviceFilterCGroupsV2.detect",
return_value=True,
)
def test_basics(self):
f = DeviceFilter("foo")
dev_null = Device(1, 3)
assert dev_null in f.devices
def test_device_unique(self):
f = DeviceFilter("foo")
l = len(f.devices)
f.add(Device(10, 232))
f.add(Device(10, 232))
assert len(f.devices) == (l + 1)
def test_read_state(self, tmp_path):
state = tmp_path / "state"
state.write_text("10 232\n10 235")
f = DeviceFilter("foo", state)
assert Device(10, 232) in f.devices
assert Device(10, 235) in f.devices
def test_missing_state(self, tmp_path):
DeviceFilter("foo", tmp_path / "does-not-exist")
def test_save_state(self, tmp_path):
state = tmp_path / "state"
f = DeviceFilter("foobar")
f.add(Device(10, 232))
f.save(state)
assert state.read_text().strip() == "10 232"
def test_save_load_roundtrip(self, tmp_path):
state = tmp_path / "state"
f1 = DeviceFilter("foobar", state)
f1.add(Device(10, 232))
f1.save(state)
f2 = DeviceFilter("foobar", state)
assert f1.devices == f2.devices
f2.add(Device(189, 1))
f2.save(state)
f3 = DeviceFilter("foobar", state)
assert f1.devices != f3.devices
assert f2.devices == f3.devices
def test_get_existing_functions(self, run):
run.return_value.stdout = "\n".join(
[
"ID AttachType AttachFlags Name",
"93 device multi",
"94 ingress",
]
)
f = DeviceFilter("foo")
assert f.__get_existing_functions__() == [93]
def test_get_existing_functions_invalid_input(self, run):
run.return_value.stdout = ""
assert DeviceFilter("foobar").__get_existing_functions__() == []
run.return_value.stdout = "blah\n"
assert DeviceFilter("foobar").__get_existing_functions__() == []
@pytest.mark.skipif(not has_bcc, reason="bcc not available")
def test_apply(self, mocker, check_call, fd, os_close):
load_func = mocker.patch("bcc.BPF.load_func")
attach_func = mocker.patch("bcc.BPF.attach_func")
mocker.patch(
"lava_dispatcher_host.docker_devices.DeviceFilterCGroupsV2.__get_existing_functions__",
return_value=[99],
)
f = DeviceFilter("foobar")
f.add(Device(10, 232))
f.apply()
load_func.assert_called()
attach_func.assert_called()
detach = check_call.call_args[0][0]
assert detach == [
"/usr/sbin/bpftool",
"cgroup",
"detach",
f.__cgroup__,
"device",
"id",
"99",
]
os_close.assert_called_with(fd)
def test_template(self):
device_filter = DeviceFilter("foobar")
device_filter.add(Device(11, 22))
program = device_filter.expand_template()
dev_null = "ctx->major == 1 && ctx->minor == 3"
assert dev_null in program
dev_something = "ctx->major == 11 && ctx->minor == 22"
assert dev_something in program
......@@ -71,7 +71,9 @@ def pass_device_lxc(mocker):
def test_simple_share_device_with_container(mocker, pass_device_lxc, device_links):
add_device_container_mapping("1", {"serial_number": "1234567890"}, "mycontainer")
share_device_with_container(Namespace(device="foo/bar", serial_number="1234567890"))
pass_device_lxc.assert_called_once_with("mycontainer", "/dev/foo/bar", device_links)
pass_device_lxc.assert_called_once_with(
"mycontainer", "/dev/foo/bar", device_links, "1"
)
def test_mapping_with_serial_number_but_called_with_vendor_product_id_too(
......@@ -96,7 +98,9 @@ def test_mapping_with_serial_number_but_called_with_vendor_product_id_too(
)
)
pass_device_lxc.assert_called_once_with("mycontainer", "/dev/foo/bar", device_links)
pass_device_lxc.assert_called_once_with(
"mycontainer", "/dev/foo/bar", device_links, "1"
)
def test_two_concurrent_jobs(mocker, pass_device_lxc, device_links):
......@@ -104,7 +108,9 @@ def test_two_concurrent_jobs(mocker, pass_device_lxc, device_links):
add_device_container_mapping("2", {"serial_number": "9876543210"}, "container2")
share_device_with_container(Namespace(device="baz/qux", serial_number="9876543210"))
pass_device_lxc.assert_called_once_with("container2", "/dev/baz/qux", device_links)
pass_device_lxc.assert_called_once_with(
"container2", "/dev/baz/qux", device_links, "2"
)
def test_no_device_found(mocker):
......@@ -128,7 +134,7 @@ def test_map_by_vendor_id_and_product_id(mocker, pass_device_lxc, device_links):
)
)
pass_device_lxc.assert_called_once_with(
"container1", "/dev/bus/usb/001/099", device_links
"container1", "/dev/bus/usb/001/099", device_links, "1"
)
......@@ -159,7 +165,7 @@ def test_only_adds_slash_dev_if_needed(mocker):
share_device_with_container(
Namespace(device="/dev/foo/bar", serial_number="1234567890")
)
share.assert_called_once_with("mycontainer", "/dev/foo/bar")
share.assert_called_once_with("mycontainer", "/dev/foo/bar", job_id="1")
def test_second_mapping_does_not_invalidate_first(mocker):
......@@ -169,7 +175,7 @@ def test_second_mapping_does_not_invalidate_first(mocker):
share_device_with_container(
Namespace(device="/dev/foo/bar", serial_number="1234567890")
)
share.assert_called_once_with("mycontainer1", "/dev/foo/bar")
share.assert_called_once_with("mycontainer1", "/dev/foo/bar", job_id="1")
def test_two_devices_two_containers(mocker):
......@@ -179,13 +185,13 @@ def test_two_devices_two_containers(mocker):
share_device_with_container(
Namespace(device="/dev/foo/bar", serial_number="1234567890")
)
share.assert_called_once_with("mycontainer1", "/dev/foo/bar")
share.assert_called_once_with("mycontainer1", "/dev/foo/bar", job_id="1")
share.reset_mock()
share_device_with_container(
Namespace(device="/dev/foo/bar", serial_number="badbeeb00c")
)
share.assert_called_once_with("mycontainer2", "/dev/foo/bar")
share.assert_called_once_with("mycontainer2", "/dev/foo/bar", job_id="1")
def test_device_plus_parent(mocker):
......@@ -214,13 +220,13 @@ def test_device_plus_parent(mocker):
share_device_with_container(
Namespace(device="/dev/foo/bar", serial_number="1234567890")
)
share.assert_called_once_with("mycontainer1", "/dev/foo/bar")
share.assert_called_once_with("mycontainer1", "/dev/foo/bar", job_id="1")
share.reset_mock()
share_device_with_container(
Namespace(device="/dev/foo/bar", vendor_id="1234", product_id="3456")
)
share.assert_called_once_with("mycontainer2", "/dev/foo/bar")
share.assert_called_once_with("mycontainer2", "/dev/foo/bar", job_id="1")
share.reset_mock()
......
......@@ -31,10 +31,16 @@ def stat(mocker):
return s