From 7e885c029a6cc66ce1881f6f2c50d5f76a7d3372 Mon Sep 17 00:00:00 2001
From: Wim Taymans <wtaymans@redhat.com>
Date: Tue, 1 Oct 2019 12:53:56 +0200
Subject: [PATCH] protocol: improve flushing

Use the IO_OUT flag to schedule flushing instead of a flush_event.

Handle EGAIN and wait for IO_OUT to try again.

Fixes #111

Upstream-Status: Backport [cc8e992cd155b4f19312a5036c7b744fc547410f]
---
 src/modules/module-protocol-native.c          | 89 +++++++++++++------
 .../module-protocol-native/connection.c       |  2 -
 2 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/src/modules/module-protocol-native.c b/src/modules/module-protocol-native.c
index 411bad6c..b7cd3140 100644
--- a/src/modules/module-protocol-native.c
+++ b/src/modules/module-protocol-native.c
@@ -83,8 +83,7 @@ struct client {
         struct spa_hook conn_listener;
 
         bool disconnecting;
-	bool flush_signaled;
-        struct spa_source *flush_event;
+	bool flushing;
 };
 
 struct server {
@@ -106,6 +105,7 @@ struct client_data {
 	struct spa_source *source;
 	struct pw_protocol_native_connection *connection;
 	bool busy;
+	bool need_flush;
 };
 
 static void
@@ -194,12 +194,14 @@ client_busy_changed(void *data, bool busy)
 {
 	struct client_data *c = data;
 	struct pw_client *client = c->client;
-	enum spa_io mask = SPA_IO_ERR | SPA_IO_HUP;
+	uint32_t mask = c->source->mask;
 
 	c->busy = busy;
 
-	if (!busy)
-		mask |= SPA_IO_IN;
+	if (busy)
+		SPA_FLAG_UNSET(mask, SPA_IO_IN);
+	else
+		SPA_FLAG_SET(mask, SPA_IO_IN);
 
 	pw_log_debug("protocol-native %p: busy changed %d", client->protocol, busy);
 	pw_loop_update_io(client->core->main_loop, c->source, mask);
@@ -214,13 +216,32 @@ connection_data(void *data, int fd, enum spa_io mask)
 {
 	struct client_data *this = data;
 	struct pw_client *client = this->client;
+	int res;
 
-	if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
+	if (mask & SPA_IO_HUP) {
 		pw_log_info("protocol-native %p: client %p disconnected", client->protocol, client);
 		pw_client_destroy(client);
 		return;
 	}
-
+	if (mask & SPA_IO_ERR) {
+		pw_log_error("protocol-native %p: client %p error", client->protocol, client);
+		pw_client_destroy(client);
+		return;
+	}
+	if (mask & SPA_IO_OUT) {
+		res = pw_protocol_native_connection_flush(this->connection);
+		if (res >= 0) {
+			int mask = this->source->mask;
+			SPA_FLAG_UNSET(mask, SPA_IO_OUT);
+			pw_loop_update_io(client->protocol->core->main_loop,
+					this->source, mask);
+		} else if (res != EAGAIN) {
+			pw_log_error("client %p: could not flush: %s",
+					client, spa_strerror(res));
+			pw_client_destroy(client);
+			return;
+		}
+	}
 	if (mask & SPA_IO_IN)
 		process_messages(this);
 }
@@ -288,7 +309,8 @@ static struct pw_client *client_new(struct server *s, int fd)
 
 	this->client = client;
 	this->source = pw_loop_add_io(pw_core_get_main_loop(core),
-				      fd, SPA_IO_ERR | SPA_IO_HUP, true, connection_data, this);
+				      fd, SPA_IO_ERR | SPA_IO_HUP, true,
+				      connection_data, this);
 	if (this->source == NULL)
 		goto cleanup_client;
 
@@ -396,7 +418,7 @@ socket_data(void *data, int fd, enum spa_io mask)
 
 	if (!client->busy)
 		pw_loop_update_io(client->protocol->core->main_loop,
-			  c->source, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
+				c->source, c->source->mask | SPA_IO_IN);
 }
 
 static bool add_socket(struct pw_protocol *protocol, struct server *s)
@@ -479,6 +501,17 @@ on_remote_data(void *data, int fd, enum spa_io mask)
 		res = -EPIPE;
 		goto error;
 	}
+	if (mask & SPA_IO_OUT) {
+		res = pw_protocol_native_connection_flush(conn);
+		if (res >= 0) {
+			int mask = impl->source->mask;
+			SPA_FLAG_UNSET(mask, SPA_IO_OUT);
+			pw_loop_update_io(core->main_loop,
+					impl->source, mask);
+			impl->flushing = false;
+		} else if (res != EAGAIN)
+			goto error;
+	}
 
         if (mask & SPA_IO_IN) {
 		const struct pw_protocol_native_message *msg;
@@ -545,23 +578,17 @@ error:
 }
 
 
-static void do_flush_event(void *data, uint64_t count)
-{
-	struct client *impl = data;
-	impl->flush_signaled = false;
-	if (impl->connection)
-		if (pw_protocol_native_connection_flush(impl->connection) < 0)
-			impl->this.disconnect(&impl->this);
-}
-
 static void on_need_flush(void *data)
 {
         struct client *impl = data;
         struct pw_remote *remote = impl->this.remote;
 
-	if (!impl->flush_signaled) {
-		impl->flush_signaled = true;
-		pw_loop_signal_event(remote->core->main_loop, impl->flush_event);
+	if (!impl->flushing) {
+		int mask = impl->source->mask;
+		impl->flushing = true;
+		SPA_FLAG_SET(mask, SPA_IO_OUT);
+		pw_loop_update_io(remote->core->main_loop,
+					impl->source, mask);
 	}
 }
 
@@ -619,12 +646,9 @@ static void impl_disconnect(struct pw_protocol_client *client)
 static void impl_destroy(struct pw_protocol_client *client)
 {
 	struct client *impl = SPA_CONTAINER_OF(client, struct client, this);
-	struct pw_remote *remote = client->remote;
 
 	impl_disconnect(client);
 
-	pw_loop_destroy_source(remote->core->main_loop, impl->flush_event);
-
 	if (impl->properties)
 		pw_properties_free(impl->properties);
 
@@ -665,8 +689,6 @@ impl_new_client(struct pw_protocol *protocol,
 	this->disconnect = impl_disconnect;
 	this->destroy = impl_destroy;
 
-	impl->flush_event = pw_loop_add_event(remote->core->main_loop, do_flush_event, impl);
-
 	spa_list_append(&protocol->client_list, &this->link);
 
 	return this;
@@ -701,10 +723,23 @@ static void on_before_hook(void *_data)
 	struct pw_protocol_server *this = &server->this;
 	struct pw_client *client, *tmp;
 	struct client_data *data;
+	int res;
 
 	spa_list_for_each_safe(client, tmp, &this->client_list, protocol_link) {
 		data = client->user_data;
-		pw_protocol_native_connection_flush(data->connection);
+
+		res = pw_protocol_native_connection_flush(data->connection);
+		if (res == -EAGAIN) {
+			int mask = data->source->mask;
+			SPA_FLAG_SET(mask, SPA_IO_OUT);
+			pw_loop_update_io(client->protocol->core->main_loop,
+					data->source, mask);
+		} else if (res < 0) {
+			pw_log_warn("client %p: could not flush: %s",
+					data->client, spa_strerror(res));
+			pw_client_destroy(client);
+		}
+
 	}
 }
 
diff --git a/src/modules/module-protocol-native/connection.c b/src/modules/module-protocol-native/connection.c
index cb592e41..8b9e919d 100644
--- a/src/modules/module-protocol-native/connection.c
+++ b/src/modules/module-protocol-native/connection.c
@@ -493,8 +493,6 @@ int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *co
 					continue;
 				else {
 					res = -errno;
-					pw_log_error("could not sendmsg on fd:%d n_fds:%d: %s",
-							conn->fd, n_fds, spa_strerror(res));
 					goto exit;
 				}
 			}
-- 
2.23.0

