diff --git a/src/module-xrdp-sink.c b/src/module-xrdp-sink.c index 9f95fc3..9fe8efe 100644 --- a/src/module-xrdp-sink.c +++ b/src/module-xrdp-sink.c @@ -89,6 +89,13 @@ PA_MODULE_USAGE( #define BLOCK_USEC 30000 //#define BLOCK_USEC (PA_USEC_PER_SEC * 2) +/* support for the set_state_in_io_thread callback was added in 11.99.1 */ +#if defined(PA_CHECK_VERSION) && PA_CHECK_VERSION(11, 99, 1) +#define USE_SET_STATE_IN_IO_THREAD_CB +#else +#undef USE_SET_STATE_IN_IO_THREAD_CB +#endif + struct userdata { pa_core *core; pa_module *module; @@ -172,6 +179,30 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, return pa_sink_process_msg(o, code, data, offset, chunk); } +#ifdef USE_SET_STATE_IN_IO_THREAD_CB +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, + pa_sink_state_t new_state, + pa_suspend_cause_t new_suspend_cause) +{ + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT) + { + if (PA_SINK_IS_OPENED(new_state)) + { + pa_log_debug("sink_set_state_in_io_thread_cb: set timestamp"); + u->timestamp = pa_rtclock_now(); + } + } + + return 0; +} +#endif /* USE_SET_STATE_IN_IO_THREAD_CB */ + static void sink_update_requested_latency_cb(pa_sink *s) { struct userdata *u; size_t nbytes; @@ -185,9 +216,46 @@ static void sink_update_requested_latency_cb(pa_sink *s) { u->block_usec = s->thread_info.max_latency; } nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); + pa_sink_set_max_rewind_within_thread(s, nbytes); pa_sink_set_max_request_within_thread(s, nbytes); } +static void process_rewind(struct userdata *u, pa_usec_t now) { + size_t rewind_nbytes, in_buffer; + pa_usec_t delay; + + pa_assert(u); + + rewind_nbytes = u->sink->thread_info.rewind_nbytes; + + if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) || rewind_nbytes <= 0) + goto do_nothing; + + pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes); + + if (u->timestamp <= now) + goto do_nothing; + + delay = u->timestamp - now; + in_buffer = pa_usec_to_bytes(delay, &u->sink->sample_spec); + + if (in_buffer <= 0) + goto do_nothing; + + if (rewind_nbytes > in_buffer) + rewind_nbytes = in_buffer; + + pa_sink_process_rewind(u->sink, rewind_nbytes); + u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink->sample_spec); + + pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes); + return; + +do_nothing: + + pa_sink_process_rewind(u->sink, 0); +} + struct header { int code; int bytes; @@ -374,7 +442,7 @@ static void thread_func(void *userdata) { now = pa_rtclock_now(); } if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) { - pa_sink_process_rewind(u->sink, 0); + process_rewind(u, now); } /* Render some data and write it to the socket */ if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { @@ -496,6 +564,9 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; +#ifdef USE_SET_STATE_IN_IO_THREAD_CB + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; +#endif u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->userdata = u; @@ -505,6 +576,7 @@ int pa__init(pa_module*m) { u->block_usec = BLOCK_USEC; pa_log_debug("3 block_usec %llu", (unsigned long long) u->block_usec); nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec); + pa_sink_set_max_rewind(u->sink, nbytes); pa_sink_set_max_request(u->sink, nbytes); set_sink_socket(ma, u); @@ -524,6 +596,8 @@ int pa__init(pa_module*m) { goto fail; } + pa_sink_set_latency_range(u->sink, 0, BLOCK_USEC); + pa_sink_put(u->sink); pa_modargs_free(ma);