From 2d4cb1ea967950d6a1d6804322cac600a806439d Mon Sep 17 00:00:00 2001 From: matt335672 <30179339+matt335672@users.noreply.github.com> Date: Wed, 7 Oct 2020 11:48:30 +0100 Subject: [PATCH 1/4] Added module params to xrdp-sink.c --- src/module-xrdp-sink.c | 91 +++++++++++++++++++++++----------------- src/module-xrdp-source.c | 71 ++++++++++++++++++------------- 2 files changed, 94 insertions(+), 68 deletions(-) diff --git a/src/module-xrdp-sink.c b/src/module-xrdp-sink.c index 62f92d9..9b5e5c8 100644 --- a/src/module-xrdp-sink.c +++ b/src/module-xrdp-sink.c @@ -81,7 +81,9 @@ PA_MODULE_USAGE( "format= " "rate= " "channels= " - "channel_map="); + "channel_map= " + "xrdp_socket_path= " + "xrdp_pulse_sink_socket="); #define DEFAULT_SINK_NAME "xrdp-sink" #define BLOCK_USEC 30000 @@ -102,10 +104,9 @@ struct userdata { pa_usec_t last_send_time; int fd; /* unix domain socket connection to xrdp chansrv */ - int display_num; int skip_bytes; - int got_max_latency; + char *sink_socket; }; static const char* const valid_modargs[] = { @@ -115,6 +116,8 @@ static const char* const valid_modargs[] = { "rate", "channels", "channel_map", + "xrdp_socket_path", + "xrdp_pulse_sink_socket", NULL }; @@ -127,27 +130,30 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, pa_usec_t now; long lat; - pa_log_debug("sink_process_msg: code %d", code); - switch (code) { - case PA_SINK_MESSAGE_SET_VOLUME: /* 3 */ + case PA_SINK_MESSAGE_SET_VOLUME: + pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_SET_VOLUME"); break; - case PA_SINK_MESSAGE_SET_MUTE: /* 6 */ + case PA_SINK_MESSAGE_SET_MUTE: + pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_SET_MUTE"); break; - case PA_SINK_MESSAGE_GET_LATENCY: /* 7 */ + case PA_SINK_MESSAGE_GET_LATENCY: + pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_GET_LATENCY"); now = pa_rtclock_now(); lat = u->timestamp > now ? u->timestamp - now : 0ULL; pa_log_debug("sink_process_msg: lat %ld", lat); *((pa_usec_t*) data) = lat; return 0; - case PA_SINK_MESSAGE_GET_REQUESTED_LATENCY: /* 8 */ + case PA_SINK_MESSAGE_GET_REQUESTED_LATENCY: + pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_GET_REQUESTED_LATENCY"); break; - case PA_SINK_MESSAGE_SET_STATE: /* 9 */ + case PA_SINK_MESSAGE_SET_STATE: + pa_log_debug("sink_process_msg: PA_SINK_MESSAGE_SET_STATE"); if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) /* 0 */ { pa_log("sink_process_msg: running"); @@ -158,6 +164,9 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, } break; + default: + pa_log_debug("sink_process_msg: code %d", code); + } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -184,7 +193,7 @@ struct header { int bytes; }; -static int get_display_num_from_display(char *display_text) { +static int get_display_num_from_display(const char *display_text) { int index; int mode; int host_index; @@ -248,8 +257,6 @@ static int lsend(int fd, char *data, int bytes) { static int data_send(struct userdata *u, pa_memchunk *chunk) { char *data; - char *socket_dir; - char *sink_socket; int bytes; int sent; int fd; @@ -265,26 +272,7 @@ static int data_send(struct userdata *u, pa_memchunk *chunk) { fd = socket(PF_LOCAL, SOCK_STREAM, 0); memset(&s, 0, sizeof(s)); s.sun_family = AF_UNIX; - bytes = sizeof(s.sun_path) - 1; - socket_dir = getenv("XRDP_SOCKET_PATH"); - if (socket_dir == NULL || socket_dir[0] == '\0') - { - socket_dir = "/tmp/.xrdp"; - } - sink_socket = getenv("XRDP_PULSE_SINK_SOCKET"); - if (sink_socket == NULL || sink_socket[0] == '\0') - { - pa_log_debug("Could not obtain sink_socket from environment."); - /* usually it doesn't reach here. if the socket name is not given - via environment variable, use hardcoded name as fallback */ - snprintf(s.sun_path, bytes, - "%s/xrdp_chansrv_audio_out_socket_%d", socket_dir, u->display_num); - } - else - { - pa_log_debug("Obtained sink_socket from environment."); - snprintf(s.sun_path, bytes, "%s/%s", socket_dir, sink_socket); - } + pa_strlcpy(s.sun_path, u->sink_socket, sizeof(s.sun_path)); pa_log_debug("trying to connect to %s", s.sun_path); if (connect(fd, (struct sockaddr *)&s, @@ -353,9 +341,6 @@ static void process_render(struct userdata *u, pa_usec_t now) { int request_bytes; pa_assert(u); - if (u->got_max_latency) { - return; - } pa_log_debug("process_render: u->block_usec %llu", (unsigned long long) u->block_usec); while (u->timestamp < now + u->block_usec) { request_bytes = u->sink->thread_info.max_request; @@ -372,8 +357,6 @@ static void process_render(struct userdata *u, pa_usec_t now) { static void thread_func(void *userdata) { struct userdata *u = userdata; - int ret; - pa_usec_t now; pa_assert(u); @@ -428,6 +411,35 @@ finish: pa_log_debug("Thread shutting down"); } +static void set_sink_socket(pa_modargs *ma, struct userdata *u) { + const char *socket_dir; + const char *socket_name; + char default_socket_name[64]; + size_t nbytes; + + socket_dir = pa_modargs_get_value(ma, "xrdp_socket_path", + getenv("XRDP_SOCKET_PATH")); + if (socket_dir == NULL || socket_dir[0] == '\0') { + socket_dir = "/tmp/.xrdp"; + } + + socket_name = pa_modargs_get_value(ma, "xrdp_pulse_sink_socket", + getenv("XRDP_PULSE_SINK_SOCKET")); + if (socket_name == NULL || socket_name[0] == '\0') + { + int display_num = get_display_num_from_display(getenv("DISPLAY")); + + pa_log_debug("Could not obtain sink_socket from environment."); + snprintf(default_socket_name, sizeof(default_socket_name), + "xrdp_chansrv_audio_out_socket_%d", display_num); + socket_name = default_socket_name; + } + + nbytes = strlen(socket_dir) + 1 + strlen(socket_name) + 1; + u->sink_socket = pa_xmalloc(nbytes); + snprintf(u->sink_socket, nbytes, "%s/%s", socket_dir, socket_name); +} + int pa__init(pa_module*m) { struct userdata *u = NULL; pa_sample_spec ss; @@ -495,7 +507,7 @@ int pa__init(pa_module*m) { nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec); pa_sink_set_max_request(u->sink, nbytes); - u->display_num = get_display_num_from_display(getenv("DISPLAY")); + set_sink_socket(ma, u); u->fd = -1; @@ -566,5 +578,6 @@ void pa__done(pa_module*m) { pa_rtpoll_free(u->rtpoll); } + pa_xfree(u->sink_socket); pa_xfree(u); } diff --git a/src/module-xrdp-source.c b/src/module-xrdp-source.c index de4ef48..6ce472e 100644 --- a/src/module-xrdp-source.c +++ b/src/module-xrdp-source.c @@ -69,7 +69,9 @@ PA_MODULE_USAGE( "source_name= " "channel_map= " "description= " - "latency_time="); + "latency_time= " + "xrdp_socket_path= " + "xrdp_pulse_source_socket="); #define DEFAULT_SOURCE_NAME "xrdp-source" #define DEFAULT_LATENCY_TIME 10 @@ -92,7 +94,7 @@ struct userdata { /* xrdp stuff */ int fd; /* UDS connection to xrdp chansrv */ - int display_num; /* X display number */ + char *source_socket; int want_src_data; }; @@ -104,10 +106,12 @@ static const char* const valid_modargs[] = { "channel_map", "description", "latency_time", + "xrdp_socket_path", + "xrdp_pulse_source_socket", NULL }; -static int get_display_num_from_display(char *display_text) ; +static int get_display_num_from_display(const char *display_text) ; static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { @@ -178,8 +182,6 @@ static int data_get(struct userdata *u, pa_memchunk *chunk) { int read_bytes; struct sockaddr_un s; char *data; - char *socket_dir; - char *source_socket; char buf[11]; unsigned char ubuf[10]; @@ -188,27 +190,7 @@ static int data_get(struct userdata *u, pa_memchunk *chunk) { fd = socket(PF_LOCAL, SOCK_STREAM, 0); memset(&s, 0, sizeof(s)); s.sun_family = AF_UNIX; - bytes = sizeof(s.sun_path) - 1; - socket_dir = getenv("XRDP_SOCKET_PATH"); - if (socket_dir == NULL || socket_dir[0] == '\0') - { - socket_dir = "/tmp/.xrdp"; - } - source_socket = getenv("XRDP_PULSE_SOURCE_SOCKET"); - if (source_socket == NULL || source_socket[0] == '\0') - { - - pa_log_debug("Could not obtain source_socket from environment."); - /* usually it doesn't reach here. if the socket name is not given - via environment variable, use hardcoded name as fallback */ - snprintf(s.sun_path, bytes, - "%s/xrdp_chansrv_audio_in_socket_%d", socket_dir, u->display_num); - } - else - { - pa_log_debug("Obtained source_socket from environment."); - snprintf(s.sun_path, bytes, "%s/%s", socket_dir, source_socket); - } + pa_strlcpy(s.sun_path, u->source_socket, sizeof(s.sun_path)); pa_log_debug("Trying to connect to %s", s.sun_path); if (connect(fd, (struct sockaddr *) &s, sizeof(struct sockaddr_un)) != 0) { @@ -386,6 +368,36 @@ finish: pa_log_debug("###### thread shutting down"); } +static void set_source_socket(pa_modargs *ma, struct userdata *u) { + const char *socket_dir; + const char *socket_name; + char default_socket_name[64]; + size_t nbytes; + + + socket_dir = pa_modargs_get_value(ma, "xrdp_socket_path", + getenv("XRDP_SOCKET_PATH")); + if (socket_dir == NULL || socket_dir[0] == '\0') { + socket_dir = "/tmp/.xrdp"; + } + + socket_name = pa_modargs_get_value(ma, "xrdp_pulse_source_socket", + getenv("XRDP_PULSE_SOURCE_SOCKET")); + if (socket_name == NULL || socket_name[0] == '\0') + { + int display_num = get_display_num_from_display(getenv("DISPLAY")); + + pa_log_debug("Could not obtain source_socket from environment."); + snprintf(default_socket_name, sizeof(default_socket_name), + "xrdp_chansrv_audio_out_socket_%d", display_num); + socket_name = default_socket_name; + } + + nbytes = strlen(socket_dir) + 1 + strlen(socket_name) + 1; + u->source_socket = pa_xmalloc(nbytes); + snprintf(u->source_socket, nbytes, "%s/%s", socket_dir, socket_name); +} + int pa__init(pa_module *m) { struct userdata *u = NULL; pa_sample_spec ss; @@ -458,6 +470,8 @@ int pa__init(pa_module *m) { u->source->thread_info.max_rewind = pa_usec_to_bytes(u->block_usec, &u->source->sample_spec); + set_source_socket(ma, u); + u->fd = -1; #if defined(PA_CHECK_VERSION) @@ -477,8 +491,6 @@ int pa__init(pa_module *m) { pa_modargs_free(ma); - u->display_num = get_display_num_from_display(getenv("DISPLAY")); - return 0; fail: @@ -514,10 +526,11 @@ void pa__done(pa_module*m) { if (u->rtpoll) pa_rtpoll_free(u->rtpoll); + pa_xfree(u->source_socket); pa_xfree(u); } -static int get_display_num_from_display(char *display_text) { +static int get_display_num_from_display(const char *display_text) { int index; int mode; int host_index; From 127a7e27ae2840a26024a7e3d5fa0a32172f203e Mon Sep 17 00:00:00 2001 From: matt335672 <30179339+matt335672@users.noreply.github.com> Date: Thu, 30 Sep 2021 12:28:29 +0100 Subject: [PATCH 2/4] Close file socket on module unload --- src/module-xrdp-sink.c | 6 ++++++ src/module-xrdp-source.c | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/src/module-xrdp-sink.c b/src/module-xrdp-sink.c index 9b5e5c8..9f95fc3 100644 --- a/src/module-xrdp-sink.c +++ b/src/module-xrdp-sink.c @@ -578,6 +578,12 @@ void pa__done(pa_module*m) { pa_rtpoll_free(u->rtpoll); } + if (u->fd >= 0) + { + close(u->fd); + u->fd = -1; + } + pa_xfree(u->sink_socket); pa_xfree(u); } diff --git a/src/module-xrdp-source.c b/src/module-xrdp-source.c index 6ce472e..3a051d8 100644 --- a/src/module-xrdp-source.c +++ b/src/module-xrdp-source.c @@ -526,6 +526,12 @@ void pa__done(pa_module*m) { if (u->rtpoll) pa_rtpoll_free(u->rtpoll); + if (u->fd >= 0) + { + close(u->fd); + u->fd = -1; + } + pa_xfree(u->source_socket); pa_xfree(u); } From 6f1857357d436c3063813a50f7856e6539d600e4 Mon Sep 17 00:00:00 2001 From: matt335672 <30179339+matt335672@users.noreply.github.com> Date: Thu, 30 Sep 2021 12:31:43 +0100 Subject: [PATCH 3/4] Latency improvements to the sink --- src/module-xrdp-sink.c | 76 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/src/module-xrdp-sink.c b/src/module-xrdp-sink.c index 9f95fc3..9fe8efe 100644 --- a/src/module-xrdp-sink.c +++ b/src/module-xrdp-sink.c @@ -89,6 +89,13 @@ PA_MODULE_USAGE( #define BLOCK_USEC 30000 //#define BLOCK_USEC (PA_USEC_PER_SEC * 2) +/* support for the set_state_in_io_thread callback was added in 11.99.1 */ +#if defined(PA_CHECK_VERSION) && PA_CHECK_VERSION(11, 99, 1) +#define USE_SET_STATE_IN_IO_THREAD_CB +#else +#undef USE_SET_STATE_IN_IO_THREAD_CB +#endif + struct userdata { pa_core *core; pa_module *module; @@ -172,6 +179,30 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, return pa_sink_process_msg(o, code, data, offset, chunk); } +#ifdef USE_SET_STATE_IN_IO_THREAD_CB +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, + pa_sink_state_t new_state, + pa_suspend_cause_t new_suspend_cause) +{ + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT) + { + if (PA_SINK_IS_OPENED(new_state)) + { + pa_log_debug("sink_set_state_in_io_thread_cb: set timestamp"); + u->timestamp = pa_rtclock_now(); + } + } + + return 0; +} +#endif /* USE_SET_STATE_IN_IO_THREAD_CB */ + static void sink_update_requested_latency_cb(pa_sink *s) { struct userdata *u; size_t nbytes; @@ -185,9 +216,46 @@ static void sink_update_requested_latency_cb(pa_sink *s) { u->block_usec = s->thread_info.max_latency; } nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); + pa_sink_set_max_rewind_within_thread(s, nbytes); pa_sink_set_max_request_within_thread(s, nbytes); } +static void process_rewind(struct userdata *u, pa_usec_t now) { + size_t rewind_nbytes, in_buffer; + pa_usec_t delay; + + pa_assert(u); + + rewind_nbytes = u->sink->thread_info.rewind_nbytes; + + if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) || rewind_nbytes <= 0) + goto do_nothing; + + pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes); + + if (u->timestamp <= now) + goto do_nothing; + + delay = u->timestamp - now; + in_buffer = pa_usec_to_bytes(delay, &u->sink->sample_spec); + + if (in_buffer <= 0) + goto do_nothing; + + if (rewind_nbytes > in_buffer) + rewind_nbytes = in_buffer; + + pa_sink_process_rewind(u->sink, rewind_nbytes); + u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink->sample_spec); + + pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes); + return; + +do_nothing: + + pa_sink_process_rewind(u->sink, 0); +} + struct header { int code; int bytes; @@ -374,7 +442,7 @@ static void thread_func(void *userdata) { now = pa_rtclock_now(); } if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) { - pa_sink_process_rewind(u->sink, 0); + process_rewind(u, now); } /* Render some data and write it to the socket */ if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { @@ -496,6 +564,9 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; +#ifdef USE_SET_STATE_IN_IO_THREAD_CB + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; +#endif u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->userdata = u; @@ -505,6 +576,7 @@ int pa__init(pa_module*m) { u->block_usec = BLOCK_USEC; pa_log_debug("3 block_usec %llu", (unsigned long long) u->block_usec); nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec); + pa_sink_set_max_rewind(u->sink, nbytes); pa_sink_set_max_request(u->sink, nbytes); set_sink_socket(ma, u); @@ -524,6 +596,8 @@ int pa__init(pa_module*m) { goto fail; } + pa_sink_set_latency_range(u->sink, 0, BLOCK_USEC); + pa_sink_put(u->sink); pa_modargs_free(ma); From d2284f11fcf0c7689b2b324063bf5725f5848cc8 Mon Sep 17 00:00:00 2001 From: matt335672 <30179339+matt335672@users.noreply.github.com> Date: Sat, 30 Oct 2021 12:26:52 +0100 Subject: [PATCH 4/4] Fixed compiler warning with -Wunused-parameter --- src/module-xrdp-sink.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/module-xrdp-sink.c b/src/module-xrdp-sink.c index 9fe8efe..fa3817d 100644 --- a/src/module-xrdp-sink.c +++ b/src/module-xrdp-sink.c @@ -88,6 +88,7 @@ PA_MODULE_USAGE( #define DEFAULT_SINK_NAME "xrdp-sink" #define BLOCK_USEC 30000 //#define BLOCK_USEC (PA_USEC_PER_SEC * 2) +#define UNUSED_VAR(x) ((void) (x)) /* support for the set_state_in_io_thread callback was added in 11.99.1 */ #if defined(PA_CHECK_VERSION) && PA_CHECK_VERSION(11, 99, 1) @@ -187,6 +188,8 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, { struct userdata *u; + UNUSED_VAR(new_suspend_cause); + pa_assert(s); pa_assert_se(u = s->userdata);