Commit c1e1601e authored by Lennart Poettering's avatar Lennart Poettering
Browse files

dbus: send out signals when units/jobs come, go and change

parent ab8ea244
......@@ -110,3 +110,8 @@ systemadm_SOURCES = \
systemadm_CPPFLAGS = $(AM_CPPFLAGS) $(DBUSGLIB_CFLAGS) $(GTK_CFLAGS)
systemadm_LDADD = $(DBUSGLIB_LIBS) $(GTK_LIBS)
CLEANFILES = \
systemd-interfaces.c \
systemctl.c \
systemadm.c
......@@ -29,6 +29,7 @@ static const char introspection[] =
"<node>"
" <interface name=\"org.freedesktop.systemd1.Job\">"
" <method name=\"Cancel\"/>"
" <signal name=\"Changed\"/>"
" <property name=\"Id\" type=\"u\" access=\"read\"/>"
" <property name=\"Unit\" type=\"(so)\" access=\"read\"/>"
" <property name=\"JobType\" type=\"s\" access=\"read\"/>"
......@@ -173,3 +174,94 @@ static DBusHandlerResult bus_job_message_handler(DBusConnection *connection, DB
const DBusObjectPathVTable bus_job_vtable = {
.message_function = bus_job_message_handler
};
void bus_job_send_change_signal(Job *j) {
char *p = NULL;
DBusMessage *m = NULL;
assert(j);
assert(j->in_dbus_queue);
LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
j->in_dbus_queue = false;
if (set_isempty(j->manager->subscribed))
return;
if (!(p = job_dbus_path(j)))
goto oom;
if (j->sent_dbus_new_signal) {
/* Send a change signal */
if (!(m = dbus_message_new_signal(p, "org.freedesktop.systemd1.Job", "Changed")))
goto oom;
} else {
/* Send a new signal */
if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "JobNew")))
goto oom;
if (!dbus_message_append_args(m,
DBUS_TYPE_UINT32, &j->id,
DBUS_TYPE_OBJECT_PATH, &p,
DBUS_TYPE_INVALID))
goto oom;
}
if (!dbus_connection_send(j->manager->bus, m, NULL))
goto oom;
free(p);
dbus_message_unref(m);
j->sent_dbus_new_signal = true;
return;
oom:
free(p);
if (m)
dbus_message_unref(m);
log_error("Failed to allocate job change signal.");
}
void bus_job_send_removed_signal(Job *j) {
char *p = NULL;
DBusMessage *m = NULL;
assert(j);
if (set_isempty(j->manager->subscribed) || !j->sent_dbus_new_signal)
return;
if (!(p = job_dbus_path(j)))
goto oom;
if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "JobRemoved")))
goto oom;
if (!dbus_message_append_args(m,
DBUS_TYPE_UINT32, &j->id,
DBUS_TYPE_OBJECT_PATH, &p,
DBUS_TYPE_INVALID))
goto oom;
if (!dbus_connection_send(j->manager->bus, m, NULL))
goto oom;
free(p);
dbus_message_unref(m);
return;
oom:
free(p);
if (m)
dbus_message_unref(m);
log_error("Failed to allocate job remove signal.");
}
......@@ -47,6 +47,24 @@
" <method name=\"ListJobs\">" \
" <arg name=\"jobs\" type=\"a(usssoo)\" direction=\"out\"/>" \
" </method>" \
" <method name=\"Subscribe\"/>" \
" <method name=\"Unsubscribe\"/>" \
" <signal name=\"UnitNew\">" \
" <arg name=\"id\" type=\"s\"/>" \
" <arg name=\"unit\" type=\"o\"/>" \
" </signal>" \
" <signal name=\"UnitRemoved\">" \
" <arg name=\"id\" type=\"s\"/>" \
" <arg name=\"unit\" type=\"o\"/>" \
" </signal>" \
" <signal name=\"JobNew\">" \
" <arg name=\"id\" type=\"u\"/>" \
" <arg name=\"job\" type=\"o\"/>" \
" </signal>" \
" <signal name=\"JobRemoved\">" \
" <arg name=\"id\" type=\"u\"/>" \
" <arg name=\"job\" type=\"o\"/>" \
" </signal>" \
" </interface>" \
BUS_PROPERTIES_INTERFACE \
BUS_INTROSPECTABLE_INTERFACE
......@@ -287,6 +305,31 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection
if (!dbus_message_iter_close_container(&iter, &sub))
goto oom;
} else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1", "Subscribe")) {
char *client;
if (!(client = strdup(dbus_message_get_sender(message))))
goto oom;
r = set_put(m->subscribed, client);
if (r < 0)
return bus_send_error_reply(m, message, NULL, r);
if (!(reply = dbus_message_new_method_return(message)))
goto oom;
} else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1", "Unsubscribe")) {
char *client;
if (!(client = set_remove(m->subscribed, (char*) dbus_message_get_sender(message))))
return bus_send_error_reply(m, message, NULL, -ENOENT);
free(client);
if (!(reply = dbus_message_new_method_return(message)))
goto oom;
} else if (dbus_message_is_method_call(message, "org.freedesktop.DBus.Introspectable", "Introspect")) {
char *introspection = NULL;
FILE *f;
......
......@@ -44,6 +44,7 @@ static const char introspection[] =
" <arg name=\"mode\" type=\"s\" direction=\"in\"/>"
" <arg name=\"job\" type=\"o\" direction=\"out\"/>"
" </method>"
" <signal name=\"Changed\"/>"
" <property name=\"Id\" type=\"s\" access=\"read\"/>"
" <property name=\"Description\" type=\"s\" access=\"read\"/>"
" <property name=\"LoadState\" type=\"s\" access=\"read\"/>"
......@@ -325,3 +326,98 @@ static DBusHandlerResult bus_unit_message_handler(DBusConnection *connection, D
const DBusObjectPathVTable bus_unit_vtable = {
.message_function = bus_unit_message_handler
};
void bus_unit_send_change_signal(Unit *u) {
char *p = NULL;
DBusMessage *m = NULL;
assert(u);
assert(u->meta.in_dbus_queue);
LIST_REMOVE(Meta, dbus_queue, u->meta.manager->dbus_unit_queue, &u->meta);
u->meta.in_dbus_queue = false;
if (set_isempty(u->meta.manager->subscribed))
return;
if (!(p = unit_dbus_path(u)))
goto oom;
if (u->meta.sent_dbus_new_signal) {
/* Send a change signal */
if (!(m = dbus_message_new_signal(p, "org.freedesktop.systemd1.Unit", "Changed")))
goto oom;
} else {
const char *id;
/* Send a new signal */
if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "UnitNew")))
goto oom;
id = unit_id(u);
if (!dbus_message_append_args(m,
DBUS_TYPE_STRING, &id,
DBUS_TYPE_OBJECT_PATH, &p,
DBUS_TYPE_INVALID))
goto oom;
}
if (!dbus_connection_send(u->meta.manager->bus, m, NULL))
goto oom;
free(p);
dbus_message_unref(m);
u->meta.sent_dbus_new_signal = true;
return;
oom:
free(p);
if (m)
dbus_message_unref(m);
log_error("Failed to allocate unit change/new signal.");
}
void bus_unit_send_removed_signal(Unit *u) {
char *p = NULL;
DBusMessage *m = NULL;
const char *id;
assert(u);
if (set_isempty(u->meta.manager->subscribed) || !u->meta.sent_dbus_new_signal)
return;
if (!(p = unit_dbus_path(u)))
goto oom;
if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "UnitRemoved")))
goto oom;
id = unit_id(u);
if (!dbus_message_append_args(m,
DBUS_TYPE_STRING, &id,
DBUS_TYPE_OBJECT_PATH, &p,
DBUS_TYPE_INVALID))
goto oom;
if (!dbus_connection_send(u->meta.manager->bus, m, NULL))
goto oom;
free(p);
dbus_message_unref(m);
return;
oom:
free(p);
if (m)
dbus_message_unref(m);
log_error("Failed to allocate unit remove signal.");
}
......@@ -276,11 +276,55 @@ static void bus_toggle_timeout(DBusTimeout *timeout, void *data) {
log_error("Failed to rearm timer: %s", strerror(-r));
}
void bus_dispatch(Manager *m) {
static DBusHandlerResult bus_message_filter(DBusConnection *connection, DBusMessage *message, void *data) {
Manager *m = data;
DBusError error;
assert(connection);
assert(message);
assert(m);
dbus_error_init(&error);
/* log_debug("Got D-Bus request: %s.%s() on %s", */
/* dbus_message_get_interface(message), */
/* dbus_message_get_member(message), */
/* dbus_message_get_path(message)); */
if (dbus_message_is_signal(message, DBUS_INTERFACE_LOCAL, "Disconnected")) {
log_error("Warning! D-Bus connection terminated.");
/* FIXME: we probably should restart D-Bus here */
} else if (dbus_message_is_signal(message, DBUS_INTERFACE_DBUS, "NameOwnerChanged")) {
const char *name, *old, *new;
if (!dbus_message_get_args(message, &error,
DBUS_TYPE_STRING, &name,
DBUS_TYPE_STRING, &old,
DBUS_TYPE_STRING, &new,
DBUS_TYPE_INVALID))
log_error("Failed to parse NameOwnerChanged message: %s", error.message);
else {
if (set_remove(m->subscribed, (char*) name))
log_debug("Subscription client vanished: %s (left: %u)", name, set_size(m->subscribed));
}
}
dbus_error_free(&error);
return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
}
unsigned bus_dispatch(Manager *m) {
assert(m);
if (!m->request_bus_dispatch)
return 0;
if (dbus_connection_dispatch(m->bus) == DBUS_DISPATCH_COMPLETE)
m->request_bus_dispatch = false;
return 1;
}
static int request_name(Manager *m) {
......@@ -328,6 +372,9 @@ int bus_init(Manager *m) {
if (m->bus)
return 0;
if (!(m->subscribed = set_new(string_hash_func, string_compare_func)))
return -ENOMEM;
dbus_connection_set_change_sigpipe(FALSE);
dbus_error_init(&error);
......@@ -343,7 +390,22 @@ int bus_init(Manager *m) {
!dbus_connection_set_timeout_functions(m->bus, bus_add_timeout, bus_remove_timeout, bus_toggle_timeout, m, NULL) ||
!dbus_connection_register_object_path(m->bus, "/org/freedesktop/systemd1", &bus_manager_vtable, m) ||
!dbus_connection_register_fallback(m->bus, "/org/freedesktop/systemd1/unit", &bus_unit_vtable, m) ||
!dbus_connection_register_fallback(m->bus, "/org/freedesktop/systemd1/job", &bus_job_vtable, m)) {
!dbus_connection_register_fallback(m->bus, "/org/freedesktop/systemd1/job", &bus_job_vtable, m) ||
!dbus_connection_add_filter(m->bus, bus_message_filter, m, NULL)) {
bus_done(m);
return -ENOMEM;
}
dbus_bus_add_match(m->bus,
"type='signal',"
"sender='"DBUS_SERVICE_DBUS"',"
"interface='"DBUS_INTERFACE_DBUS"',"
"path='"DBUS_PATH_DBUS"'",
&error);
if (dbus_error_is_set(&error)) {
log_error("Failed to register match: %s", error.message);
dbus_error_free(&error);
bus_done(m);
return -ENOMEM;
}
......@@ -371,6 +433,16 @@ void bus_done(Manager *m) {
dbus_connection_unref(m->bus);
m->bus = NULL;
}
if (m->subscribed) {
char *c;
while ((c = set_steal_first(m->subscribed)))
free(c);
set_free(m->subscribed);
m->subscribed = NULL;
}
}
DBusHandlerResult bus_default_message_handler(Manager *m, DBusMessage *message, const char*introspection, const BusProperty *properties) {
......
......@@ -59,7 +59,7 @@ typedef struct BusProperty {
int bus_init(Manager *m);
void bus_done(Manager *m);
void bus_dispatch(Manager *m);
unsigned bus_dispatch(Manager *m);
void bus_watch_event(Manager *m, Watch *w, int events);
void bus_timeout_event(Manager *m, Watch *w, int events);
......@@ -78,4 +78,10 @@ extern const DBusObjectPathVTable bus_manager_vtable;
extern const DBusObjectPathVTable bus_job_vtable;
extern const DBusObjectPathVTable bus_unit_vtable;
void bus_unit_send_change_signal(Unit *u);
void bus_unit_send_removed_signal(Unit *u);
void bus_job_send_change_signal(Job *j);
void bus_job_send_removed_signal(Job *j);
#endif
......@@ -212,6 +212,8 @@ static int device_process_new_device(Manager *m, struct udev_device *dev, bool u
device_set_state(DEVICE(u), DEVICE_AVAILABLE);
}
unit_add_to_dbus_queue(u);
return 0;
fail:
......
......@@ -25,9 +25,7 @@
- implement timer
- implement mount/automount
- more process attributes: cpu affinity, cpu scheduling
- implement automount
- create session/pgroup for child processes? handle input on console properly? interactive fsck? interactive luks password?
......
......@@ -55,6 +55,8 @@ void job_free(Job *j) {
/* Detach from next 'bigger' objects */
if (j->installed) {
bus_job_send_removed_signal(j);
if (j->unit->meta.job == j)
j->unit->meta.job = NULL;
......@@ -65,6 +67,12 @@ void job_free(Job *j) {
/* Detach from next 'smaller' objects */
manager_transaction_unlink_job(j->manager, j);
if (j->in_run_queue)
LIST_REMOVE(Job, run_queue, j->manager->run_queue, j);
if (j->in_dbus_queue)
LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
free(j);
}
......@@ -326,6 +334,7 @@ int job_run_and_invalidate(Job *j) {
return -EAGAIN;
j->state = JOB_RUNNING;
job_add_to_dbus_queue(j);
switch (j->type) {
......@@ -408,6 +417,7 @@ int job_finish_and_invalidate(Job *j, bool success) {
assert(j->installed);
log_debug("Job %s/%s finished, success=%s", unit_id(j->unit), job_type_to_string(j->type), yes_no(success));
job_add_to_dbus_queue(j);
/* Patch restart jobs so that they become normal start jobs */
if (success && (j->type == JOB_RESTART || j->type == JOB_TRY_RESTART)) {
......@@ -419,7 +429,7 @@ int job_finish_and_invalidate(Job *j, bool success) {
j->state = JOB_RUNNING;
j->type = JOB_START;
job_schedule_run(j);
job_add_to_run_queue(j);
return 0;
}
......@@ -463,15 +473,15 @@ int job_finish_and_invalidate(Job *j, bool success) {
/* Try to start the next jobs that can be started */
SET_FOREACH(other, u->meta.dependencies[UNIT_AFTER], i)
if (other->meta.job)
job_schedule_run(other->meta.job);
job_add_to_run_queue(other->meta.job);
SET_FOREACH(other, u->meta.dependencies[UNIT_BEFORE], i)
if (other->meta.job)
job_schedule_run(other->meta.job);
job_add_to_run_queue(other->meta.job);
return 0;
}
void job_schedule_run(Job *j) {
void job_add_to_run_queue(Job *j) {
assert(j);
assert(j->installed);
......@@ -482,6 +492,17 @@ void job_schedule_run(Job *j) {
j->in_run_queue = true;
}
void job_add_to_dbus_queue(Job *j) {
assert(j);
assert(j->installed);
if (j->in_dbus_queue)
return;
LIST_PREPEND(Job, dbus_queue, j->manager->dbus_job_queue, j);
j->in_dbus_queue = true;
}
char *job_dbus_path(Job *j) {
char *p;
......
......@@ -94,9 +94,12 @@ struct Job {
bool in_run_queue:1;
bool matters_to_anchor:1;
bool forced:1;
bool in_dbus_queue:1;
bool sent_dbus_new_signal:1;
LIST_FIELDS(Job, transaction);
LIST_FIELDS(Job, run_queue);
LIST_FIELDS(Job, dbus_queue);
LIST_HEAD(JobDependency, subject_list);
LIST_HEAD(JobDependency, object_list);
......@@ -126,7 +129,9 @@ bool job_type_is_conflicting(JobType a, JobType b);
bool job_is_runnable(Job *j);
void job_schedule_run(Job *j);
void job_add_to_run_queue(Job *j);
void job_add_to_dbus_queue(Job *j);
int job_run_and_invalidate(Job *j);
int job_finish_and_invalidate(Job *j, bool success);
......
......@@ -665,7 +665,8 @@ static int transaction_apply(Manager *m, JobMode mode) {
assert(!j->transaction_next);
assert(!j->transaction_prev);
job_schedule_run(j);
job_add_to_run_queue(j);
job_add_to_dbus_queue(j);
}
/* As last step, kill all remaining job dependencies. */
......@@ -946,14 +947,15 @@ Unit *manager_get_unit(Manager *m, const char *name) {
return hashmap_get(m->units, name);
}
void manager_dispatch_load_queue(Manager *m) {
unsigned manager_dispatch_load_queue(Manager *m) {
Meta *meta;
unsigned n = 0;
assert(m);
/* Make sure we are not run recursively */
if (m->dispatching_load_queue)
return;
return 0;
m->dispatching_load_queue = true;
......@@ -964,9 +966,11 @@ void manager_dispatch_load_queue(Manager *m) {
assert(meta->in_load_queue);
unit_load(UNIT(meta));
n++;
}
m->dispatching_load_queue = false;
return n;
}
int manager_load_unit(Manager *m, const char *path, Unit **_ret) {
......@@ -1004,6 +1008,8 @@ int manager_load_unit(Manager *m, const char *path, Unit **_ret) {
}
unit_add_to_load_queue(ret);
unit_add_to_dbus_queue(ret);
manager_dispatch_load_queue(m);
*_ret = ret;
......@@ -1045,11 +1051,12 @@ void manager_clear_jobs(Manager *m) {
job_free(j);
}
void manager_dispatch_run_queue(Manager *m) {
unsigned manager_dispatch_run_queue(Manager *m) {
Job *j;
unsigned n = 0;
if (m->dispatching_run_queue)
return;
return 0;
m->dispatching_run_queue = true;
......@@ -1058,9 +1065,42 @@ void manager_dispatch_run_queue(Manager *m) {
assert(j->in_run_queue);
job_run_and_invalidate(j);
n++;
}
m->dispatching_run_queue = false;
return n;
}
unsigned manager_dispatch_dbus_queue(Manager *m) {
Job *j;
Meta *meta;
unsigned n = 0;
assert(m);
if (m->dispatching_dbus_queue)
return 0;
m->dispatching_dbus_queue = true;
while ((meta = m->dbus_unit_queue)) {
Unit *u = (Unit*) meta;
assert(u->meta.in_dbus_queue);
bus_unit_send_change_signal(u);
n++;
}
while ((j = m->dbus_job_queue)) {
assert(j->in_dbus_queue);