Commit a567261a authored by Lennart Poettering's avatar Lennart Poettering
Browse files

dbus: send signals about jobs to the clients having created them...

dbus: send signals about jobs to the clients having created them unconditionally, and thus get rid of broadcast signals in most cases
parent 552e4331
......@@ -39,8 +39,6 @@
* systemctl daemon-reload is kaputt
* get rid of Subscribe() in systemctl
* Turn around negative options
* Add missing man pages: update systemd.1, finish daemon.7
......
......@@ -146,6 +146,32 @@ const DBusObjectPathVTable bus_job_vtable = {
.message_function = bus_job_message_handler
};
static int job_send_message(Job *j, DBusMessage *m) {
int r;
assert(j);
assert(m);
if (bus_has_subscriber(j->manager)) {
if ((r = bus_broadcast(j->manager, m)) < 0)
return r;
} else if (j->bus_client) {
/* If nobody is subscribed, we just send the message
* to the client which created the job */
assert(j->bus);
if (!dbus_message_set_destination(m, j->bus_client))
return -ENOMEM;
if (!dbus_connection_send(j->bus, m, NULL))
return -ENOMEM;
}
return 0;
}
void bus_job_send_change_signal(Job *j) {
char *p = NULL;
DBusMessage *m = NULL;
......@@ -156,7 +182,7 @@ void bus_job_send_change_signal(Job *j) {
LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
j->in_dbus_queue = false;
if (set_isempty(j->manager->subscribed)) {
if (!bus_has_subscriber(j->manager) && !j->bus_client) {
j->sent_dbus_new_signal = true;
return;
}
......@@ -182,7 +208,7 @@ void bus_job_send_change_signal(Job *j) {
goto oom;
}
if (bus_broadcast(j->manager, m) < 0)
if (job_send_message(j, m) < 0)
goto oom;
free(p);
......@@ -208,7 +234,7 @@ void bus_job_send_removed_signal(Job *j, bool success) {
assert(j);
if (set_isempty(j->manager->subscribed))
if (!bus_has_subscriber(j->manager) && !j->bus_client)
return;
if (!j->sent_dbus_new_signal)
......@@ -227,7 +253,7 @@ void bus_job_send_removed_signal(Job *j, bool success) {
DBUS_TYPE_INVALID))
goto oom;
if (bus_broadcast(j->manager, m) < 0)
if (job_send_message(j, m) < 0)
goto oom;
free(p);
......
......@@ -452,14 +452,25 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
} else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1.Manager", "Subscribe")) {
char *client;
Set *s;
if (!(s = BUS_CONNECTION_SUBSCRIBED(m, connection))) {
if (!(s = set_new(string_hash_func, string_compare_func)))
goto oom;
if (!(dbus_connection_set_data(connection, m->subscribed_data_slot, s, NULL))) {
set_free(s);
goto oom;
}
}
if (!(client = strdup(dbus_message_get_sender(message))))
goto oom;
r = set_put(m->subscribed, client);
if (r < 0)
if ((r = set_put(s, client)) < 0) {
free(client);
return bus_send_error_reply(m, connection, message, NULL, r);
}
if (!(reply = dbus_message_new_method_return(message)))
goto oom;
......@@ -467,7 +478,7 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
} else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1.Manager", "Unsubscribe")) {
char *client;
if (!(client = set_remove(m->subscribed, (char*) dbus_message_get_sender(message))))
if (!(client = set_remove(BUS_CONNECTION_SUBSCRIBED(m, connection), (char*) dbus_message_get_sender(message))))
return bus_send_error_reply(m, connection, message, NULL, -ENOENT);
free(client);
......@@ -702,6 +713,11 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
if ((r = manager_add_job(m, job_type, u, mode, true, &j)) < 0)
return bus_send_error_reply(m, connection, message, NULL, r);
if (!(j->bus_client = strdup(dbus_message_get_sender(message))))
goto oom;
j->bus = connection;
if (!(reply = dbus_message_new_method_return(message)))
goto oom;
......@@ -713,6 +729,7 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
DBUS_TYPE_OBJECT_PATH, &path,
DBUS_TYPE_INVALID))
goto oom;
}
free(path);
......
......@@ -376,7 +376,7 @@ void bus_unit_send_change_signal(Unit *u) {
LIST_REMOVE(Meta, dbus_queue, u->meta.manager->dbus_unit_queue, &u->meta);
u->meta.in_dbus_queue = false;
if (set_isempty(u->meta.manager->subscribed)) {
if (!bus_has_subscriber(u->meta.manager)) {
u->meta.sent_dbus_new_signal = true;
return;
}
......@@ -427,7 +427,7 @@ void bus_unit_send_removed_signal(Unit *u) {
assert(u);
if (set_isempty(u->meta.manager->subscribed))
if (!bus_has_subscriber(u->meta.manager))
return;
if (!u->meta.sent_dbus_new_signal)
......
......@@ -355,8 +355,8 @@ static DBusHandlerResult api_bus_message_filter(DBusConnection *connection, DBus
DBUS_TYPE_INVALID))
log_error("Failed to parse NameOwnerChanged message: %s", error.message);
else {
if (set_remove(m->subscribed, (char*) name))
log_debug("Subscription client vanished: %s (left: %u)", name, set_size(m->subscribed));
if (set_remove(BUS_CONNECTION_SUBSCRIBED(m, connection), (char*) name))
log_debug("Subscription client vanished: %s (left: %u)", name, set_size(BUS_CONNECTION_SUBSCRIBED(m, connection)));
if (old_owner[0] == 0)
old_owner = NULL;
......@@ -882,13 +882,6 @@ static int bus_init_api(Manager *m) {
strnull(dbus_bus_get_unique_name(m->api_bus)));
dbus_free(id);
if (!m->subscribed)
if (!(m->subscribed = set_new(string_hash_func, string_compare_func))) {
log_error("Not enough memory");
r = -ENOMEM;
goto fail;
}
return 0;
fail:
......@@ -959,6 +952,12 @@ int bus_init(Manager *m) {
return -ENOMEM;
}
if (m->subscribed_data_slot < 0)
if (!dbus_pending_call_allocate_data_slot(&m->subscribed_data_slot)) {
log_error("Not enough memory");
return -ENOMEM;
}
if ((r = bus_init_system(m)) < 0 ||
(r = bus_init_api(m)) < 0 ||
(r = bus_init_private(m)) < 0)
......@@ -968,9 +967,30 @@ int bus_init(Manager *m) {
}
static void shutdown_connection(Manager *m, DBusConnection *c) {
Set *s;
Job *j;
Iterator i;
HASHMAP_FOREACH(j, m->jobs, i)
if (j->bus == c) {
free(j->bus_client);
j->bus_client = NULL;
j->bus = NULL;
}
set_remove(m->bus_connections, c);
set_remove(m->bus_connections_for_dispatch, c);
if ((s = BUS_CONNECTION_SUBSCRIBED(m, c))) {
char *t;
while ((t = set_steal_first(s)))
free(t);
set_free(s);
}
dbus_connection_set_dispatch_status_function(c, NULL, NULL, NULL);
dbus_connection_flush(c);
dbus_connection_close(c);
......@@ -988,15 +1008,6 @@ static void bus_done_api(Manager *m) {
m->api_bus = NULL;
}
if (m->subscribed) {
char *c;
while ((c = set_steal_first(m->subscribed)))
free(c);
set_free(m->subscribed);
m->subscribed = NULL;
}
if (m->queued_message) {
dbus_message_unref(m->queued_message);
......@@ -1043,6 +1054,9 @@ void bus_done(Manager *m) {
if (m->name_data_slot >= 0)
dbus_pending_call_free_data_slot(&m->name_data_slot);
if (m->subscribed_data_slot >= 0)
dbus_pending_call_free_data_slot(&m->subscribed_data_slot);
}
static void query_pid_pending_cb(DBusPendingCall *pending, void *userdata) {
......@@ -1053,7 +1067,7 @@ static void query_pid_pending_cb(DBusPendingCall *pending, void *userdata) {
dbus_error_init(&error);
assert_se(name = dbus_pending_call_get_data(pending, m->name_data_slot));
assert_se(name = BUS_PENDING_CALL_NAME(m, pending));
assert_se(reply = dbus_pending_call_steal_reply(pending));
switch (dbus_message_get_type(reply)) {
......@@ -1538,3 +1552,27 @@ int bus_parse_strv(DBusMessage *m, char ***_l) {
return 0;
}
bool bus_has_subscriber(Manager *m) {
Iterator i;
DBusConnection *c;
assert(m);
SET_FOREACH(c, m->bus_connections_for_dispatch, i)
if (bus_connection_has_subscriber(m, c))
return true;
SET_FOREACH(c, m->bus_connections, i)
if (bus_connection_has_subscriber(m, c))
return true;
return false;
}
bool bus_connection_has_subscriber(Manager *m, DBusConnection *c) {
assert(m);
assert(c);
return !set_isempty(BUS_CONNECTION_SUBSCRIBED(m, c));
}
......@@ -105,6 +105,12 @@ int bus_property_append_ul(Manager *m, DBusMessageIter *i, const char *property,
int bus_parse_strv(DBusMessage *m, char ***_l);
bool bus_has_subscriber(Manager *m);
bool bus_connection_has_subscriber(Manager *m, DBusConnection *c);
#define BUS_CONNECTION_SUBSCRIBED(m, c) dbus_connection_get_data((c), (m)->subscribed_data_slot)
#define BUS_PENDING_CALL_NAME(m, p) dbus_pending_call_get_data((p), (m)->name_data_slot)
extern const char * const bus_interface_table[];
#endif
......@@ -76,6 +76,7 @@ void job_free(Job *j) {
if (j->in_dbus_queue)
LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
free(j->bus_client);
free(j);
}
......@@ -544,10 +545,9 @@ void job_add_to_dbus_queue(Job *j) {
if (j->in_dbus_queue)
return;
if (set_isempty(j->manager->subscribed)) {
j->sent_dbus_new_signal = true;
return;
}
/* We don't check if anybody is subscribed here, since this
* job might just have been created and not yet assigned to a
* connection/client. */
LIST_PREPEND(Job, dbus_queue, j->manager->dbus_job_queue, j);
j->in_dbus_queue = true;
......
......@@ -102,6 +102,10 @@ struct Job {
JobType type;
JobState state;
/* Note that this bus object is not ref counted here. */
DBusConnection *bus;
char *bus_client;
bool installed:1;
bool in_run_queue:1;
bool matters_to_anchor:1;
......
......@@ -198,7 +198,7 @@ int manager_new(ManagerRunningAs running_as, bool confirm_spawn, Manager **_m) {
m->running_as = running_as;
m->confirm_spawn = confirm_spawn;
m->name_data_slot = -1;
m->name_data_slot = m->subscribed_data_slot = -1;
m->exit_code = _MANAGER_EXIT_CODE_INVALID;
m->pin_cgroupfs_fd = -1;
......
......@@ -156,7 +156,6 @@ struct Manager {
DBusServer *private_bus;
Set *bus_connections, *bus_connections_for_dispatch;
Set *subscribed;
DBusMessage *queued_message; /* This is used during reloading:
* before the reload we queue the
* reply message here, and
......@@ -164,6 +163,7 @@ struct Manager {
Hashmap *watch_bus; /* D-Bus names => Unit object n:1 */
int32_t name_data_slot;
int32_t subscribed_data_slot;
/* Data specific to the Automount subsystem */
int dev_autofs_fd;
......
......@@ -583,13 +583,10 @@ static DBusHandlerResult wait_filter(DBusConnection *connection, DBusMessage *me
static int enable_wait_for_jobs(DBusConnection *bus) {
DBusError error;
DBusMessage *m = NULL, *reply = NULL;
int r;
assert(bus);
dbus_error_init(&error);
dbus_bus_add_match(bus,
"type='signal',"
"sender='org.freedesktop.systemd1',"
......@@ -600,40 +597,12 @@ static int enable_wait_for_jobs(DBusConnection *bus) {
if (dbus_error_is_set(&error)) {
log_error("Failed to add match: %s", error.message);
r = -EIO;
goto finish;
}
if (!(m = dbus_message_new_method_call(
"org.freedesktop.systemd1",
"/org/freedesktop/systemd1",
"org.freedesktop.systemd1.Manager",
"Subscribe"))) {
log_error("Could not allocate message.");
r = -ENOMEM;
goto finish;
}
if (!(reply = dbus_connection_send_with_reply_and_block(bus, m, -1, &error))) {
log_error("Failed to issue method call: %s", error.message);
r = -EIO;
goto finish;
dbus_error_free(&error);
return -EIO;
}
r = 0;
finish:
/* This is slightly dirty, since we don't undo the match registrations. */
if (m)
dbus_message_unref(m);
if (reply)
dbus_message_unref(reply);
dbus_error_free(&error);
return r;
return 0;
}
static int wait_for_jobs(DBusConnection *bus, Set *s) {
......
......@@ -282,7 +282,8 @@ void unit_add_to_dbus_queue(Unit *u) {
if (u->meta.load_state == UNIT_STUB || u->meta.in_dbus_queue)
return;
if (set_isempty(u->meta.manager->subscribed)) {
/* Shortcut things if nobody cares */
if (!bus_has_subscriber(u->meta.manager)) {
u->meta.sent_dbus_new_signal = true;
return;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment