Browse Source

chansrv: work on pulse sink

pull/1/head
Jay Sorg 11 years ago
parent
commit
5a0d34c1a5
2 changed files with 107 additions and 87 deletions
  1. +104
    -85
      module-xrdp-sink.c
  2. +3
    -2
      pulse-notes.txt

+ 104
- 85
module-xrdp-sink.c View File

@ -88,9 +88,13 @@ struct userdata {
pa_usec_t block_usec; pa_usec_t block_usec;
pa_usec_t timestamp; pa_usec_t timestamp;
pa_usec_t failed_connect_time;
pa_usec_t last_send_time;
int fd; /* unix domain socket connection to chansrv */
pa_memchunk memchunk;
int fd; /* unix domain socket connection to xrdp chansrv */
int display_num;
int skip_bytes;
int got_max_latency;
}; };
@ -111,28 +115,35 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data,
pa_usec_t now; pa_usec_t now;
long lat; long lat;
pa_log("sink_process_msg: code %d", code);
//pa_log("sink_process_msg: code %d", code);
switch (code) { switch (code) {
case PA_SINK_MESSAGE_SET_STATE: /* 9 */
case PA_SINK_MESSAGE_SET_VOLUME: /* 3 */
break;
case PA_SINK_MESSAGE_SET_MUTE: /* 6 */
break;
case PA_SINK_MESSAGE_GET_LATENCY: /* 7 */
now = pa_rtclock_now();
lat = u->timestamp > now ? u->timestamp - now : 0ULL;
//pa_log("sink_process_msg: lat %ld", lat);
*((pa_usec_t*) data) = lat;
return 0;
case PA_SINK_MESSAGE_GET_REQUESTED_LATENCY: /* 8 */
break;
case PA_SINK_MESSAGE_SET_STATE: /* 9 */
if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) /* 0 */ { if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) /* 0 */ {
pa_log("sink_process_msg: running"); pa_log("sink_process_msg: running");
u->timestamp = pa_rtclock_now(); u->timestamp = pa_rtclock_now();
} else { } else {
pa_log("sink_process_msg: not running"); pa_log("sink_process_msg: not running");
} }
break; break;
case PA_SINK_MESSAGE_GET_LATENCY:
now = pa_rtclock_now();
lat = u->timestamp > now ? u->timestamp - now : 0ULL;
pa_log("sink_process_msg: lat %ld", lat);
*((pa_usec_t*) data) = lat;
return 0;
} }
return pa_sink_process_msg(o, code, data, offset, chunk); return pa_sink_process_msg(o, code, data, offset, chunk);
@ -147,8 +158,11 @@ static void sink_update_requested_latency_cb(pa_sink *s) {
u->block_usec = pa_sink_get_requested_latency_within_thread(s); u->block_usec = pa_sink_get_requested_latency_within_thread(s);
if (u->block_usec == (pa_usec_t) -1)
u->got_max_latency = 0;
if (u->block_usec == (pa_usec_t) -1) {
u->block_usec = s->thread_info.max_latency; u->block_usec = s->thread_info.max_latency;
u->got_max_latency = 1;
}
nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
pa_sink_set_max_rewind_within_thread(s, nbytes); pa_sink_set_max_rewind_within_thread(s, nbytes);
@ -166,7 +180,8 @@ static void process_rewind(struct userdata *u, pa_usec_t now) {
u->sink->thread_info.rewind_nbytes = 0; u->sink->thread_info.rewind_nbytes = 0;
pa_assert(rewind_nbytes > 0); pa_assert(rewind_nbytes > 0);
pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes);
pa_log_debug("Requested to rewind %lu bytes.",
(unsigned long) rewind_nbytes);
if (u->timestamp <= now) if (u->timestamp <= now)
goto do_nothing; goto do_nothing;
@ -182,6 +197,7 @@ static void process_rewind(struct userdata *u, pa_usec_t now) {
pa_sink_process_rewind(u->sink, rewind_nbytes); pa_sink_process_rewind(u->sink, rewind_nbytes);
u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink->sample_spec); u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink->sample_spec);
u->skip_bytes += rewind_nbytes;
pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes); pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes);
return; return;
@ -191,8 +207,7 @@ do_nothing:
pa_sink_process_rewind(u->sink, 0); pa_sink_process_rewind(u->sink, 0);
} }
struct header
{
struct header {
int code; int code;
int bytes; int bytes;
}; };
@ -243,32 +258,57 @@ static int get_display_num_from_display(char *display_text) {
return display_num; return display_num;
} }
static int data_send(struct userdata *u) {
static int data_send(struct userdata *u, pa_memchunk *chunk) {
char *data; char *data;
int bytes; int bytes;
int sent; int sent;
int display_num; int display_num;
int fd;
struct header h; struct header h;
struct sockaddr_un s;
if (u->fd == 0) { if (u->fd == 0) {
int fd = socket(PF_LOCAL, SOCK_STREAM, 0);
struct sockaddr_un s = { 0 };
if (u->failed_connect_time != 0) {
if (pa_rtclock_now() - u->failed_connect_time < 1000000) {
return 0;
}
}
fd = socket(PF_LOCAL, SOCK_STREAM, 0);
memset(&s, 0, sizeof(s));
s.sun_family = AF_UNIX; s.sun_family = AF_UNIX;
display_num = get_display_num_from_display(getenv("DISPLAY")); display_num = get_display_num_from_display(getenv("DISPLAY"));
bytes = sizeof(s.sun_path) - 1; bytes = sizeof(s.sun_path) - 1;
snprintf(s.sun_path, bytes, CHANSRV_PORT_STR, display_num); snprintf(s.sun_path, bytes, CHANSRV_PORT_STR, display_num);
pa_log("trying to conenct to %s", s.sun_path);
if (connect(fd, (struct sockaddr *)&s, if (connect(fd, (struct sockaddr *)&s,
sizeof(struct sockaddr_un)) != 0) { sizeof(struct sockaddr_un)) != 0) {
//pa_log("Connected failed");
u->failed_connect_time = pa_rtclock_now();
pa_log("Connected failed");
close(fd); close(fd);
return 0; return 0;
} }
u->fd = fd;
u->failed_connect_time = 0;
pa_log("Connected ok fd %d", fd); pa_log("Connected ok fd %d", fd);
u->fd = fd;
} }
bytes = u->memchunk.length;
pa_log("bytes %d", bytes);
bytes = chunk->length;
//pa_log("bytes %d", bytes);
/* from rewind */
if (u->skip_bytes > 0)
{
if (bytes > u->skip_bytes)
{
bytes -= u->skip_bytes;
u->skip_bytes = 0;
}
else
{
u->skip_bytes -= bytes;
return bytes;
}
}
h.code = 0; h.code = 0;
h.bytes = bytes + 8; h.bytes = bytes + 8;
@ -278,13 +318,13 @@ static int data_send(struct userdata *u) {
u->fd = 0; u->fd = 0;
return 0; return 0;
} else { } else {
pa_log("data_send: sent header ok bytes %d", bytes);
//pa_log("data_send: sent header ok bytes %d", bytes);
} }
data = (char*)pa_memblock_acquire(u->memchunk.memblock);
data += u->memchunk.index;
data = (char*)pa_memblock_acquire(chunk->memblock);
data += chunk->index;
sent = send(u->fd, data, bytes, 0); sent = send(u->fd, data, bytes, 0);
pa_memblock_release(u->memchunk.memblock);
pa_memblock_release(chunk->memblock);
if (sent != bytes) { if (sent != bytes) {
pa_log("data_send: send failed sent %d bytes %d", sent, bytes); pa_log("data_send: send failed sent %d bytes %d", sent, bytes);
@ -293,65 +333,37 @@ static int data_send(struct userdata *u) {
return 0; return 0;
} }
u->memchunk.index += sent;
u->memchunk.length -= sent;
if (u->memchunk.length <= 0) {
pa_memblock_unref(u->memchunk.memblock);
pa_memchunk_reset(&u->memchunk);
}
return sent; return sent;
} }
static void process_render(struct userdata *u, pa_usec_t now) { static void process_render(struct userdata *u, pa_usec_t now) {
pa_memchunk chunk; pa_memchunk chunk;
int request_bytes; int request_bytes;
int index;
size_t ate = 0;
//int index;
pa_assert(u); pa_assert(u);
/* This is the configured latency. Sink inputs connected to us
might not have a single frame more than the maxrequest value
queed. Hence: at maximum read this many bytes from the sink
inputs. */
if (u->got_max_latency) {
return;
}
index = 0;
/* Fill the buffer up the the latency size */
//index = 0;
while (u->timestamp < now + u->block_usec) { while (u->timestamp < now + u->block_usec) {
//index++;
//if (index > 3) {
/* used when u->block_usec and
u->sink->thread_info.max_request get big
using got_max_latency now */
// return;
//}
request_bytes = u->sink->thread_info.max_request; request_bytes = u->sink->thread_info.max_request;
request_bytes = MIN(request_bytes, 8192);
request_bytes = MIN(request_bytes, 16 * 1024);
pa_sink_render(u->sink, request_bytes, &chunk); pa_sink_render(u->sink, request_bytes, &chunk);
index++;
pa_log("bytes %d index %d", chunk.length, index);
//pa_log("bytes %d index %d", chunk.length, index);
data_send(u, &chunk);
pa_memblock_unref(chunk.memblock); pa_memblock_unref(chunk.memblock);
/* pa_log_debug("Ate %lu bytes.", (unsigned long) chunk.length); */
u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec); u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec);
ate += chunk.length;
//if (ate >= u->sink->thread_info.max_request)
// break;
} }
/* pa_log_debug("Ate in sum %lu bytes (of %lu)", (unsigned long) ate, (unsigned long) nbytes); */
//pa_log("%d", u->memchunk.length);
//pa_log("a");
//if (u->memchunk.length <= 0)
// pa_sink_render(u->sink, 8192, &u->memchunk);
//pa_log("b");
//data_send(u);
//pa_log("c");
} }
static void thread_func(void *userdata) { static void thread_func(void *userdata) {
@ -430,7 +442,8 @@ int pa__init(pa_module*m) {
ss = m->core->default_sample_spec; ss = m->core->default_sample_spec;
map = m->core->default_channel_map; map = m->core->default_channel_map;
if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map,
PA_CHANNEL_MAP_DEFAULT) < 0) {
pa_log("Invalid sample format specification or channel map"); pa_log("Invalid sample format specification or channel map");
goto fail; goto fail;
} }
@ -444,19 +457,22 @@ int pa__init(pa_module*m) {
pa_sink_new_data_init(&data); pa_sink_new_data_init(&data);
data.driver = __FILE__; data.driver = __FILE__;
data.module = m; data.module = m;
pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
pa_sink_new_data_set_name(&data,
pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
pa_sink_new_data_set_sample_spec(&data, &ss); pa_sink_new_data_set_sample_spec(&data, &ss);
pa_sink_new_data_set_channel_map(&data, &map); pa_sink_new_data_set_channel_map(&data, &map);
pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "xrdp sink"); pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "xrdp sink");
pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "abstract"); pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "abstract");
if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist,
PA_UPDATE_REPLACE) < 0) {
pa_log("Invalid properties"); pa_log("Invalid properties");
pa_sink_new_data_done(&data); pa_sink_new_data_done(&data);
goto fail; goto fail;
} }
u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY);
u->sink = pa_sink_new(m->core, &data,
PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY);
pa_sink_new_data_done(&data); pa_sink_new_data_done(&data);
if (!u->sink) { if (!u->sink) {
@ -476,7 +492,7 @@ int pa__init(pa_module*m) {
pa_sink_set_max_rewind(u->sink, nbytes); pa_sink_set_max_rewind(u->sink, nbytes);
pa_sink_set_max_request(u->sink, nbytes); pa_sink_set_max_request(u->sink, nbytes);
pa_memchunk_reset(&u->memchunk);
u->display_num = get_display_num_from_display(getenv("DISPLAY"));
#if defined(PA_CHECK_VERSION) #if defined(PA_CHECK_VERSION)
#if PA_CHECK_VERSION(0, 9, 22) #if PA_CHECK_VERSION(0, 9, 22)
@ -498,8 +514,9 @@ int pa__init(pa_module*m) {
return 0; return 0;
fail: fail:
if (ma)
if (ma) {
pa_modargs_free(ma); pa_modargs_free(ma);
}
pa__done(m); pa__done(m);
@ -520,27 +537,29 @@ void pa__done(pa_module*m) {
pa_assert(m); pa_assert(m);
if (!(u = m->userdata))
if (!(u = m->userdata)) {
return; return;
}
if (u->sink)
if (u->sink) {
pa_sink_unlink(u->sink); pa_sink_unlink(u->sink);
if (u->memchunk.memblock)
pa_memblock_unref(u->memchunk.memblock);
}
if (u->thread) { if (u->thread) {
pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN,
NULL, 0, NULL);
pa_thread_free(u->thread); pa_thread_free(u->thread);
} }
pa_thread_mq_done(&u->thread_mq); pa_thread_mq_done(&u->thread_mq);
if (u->sink)
if (u->sink) {
pa_sink_unref(u->sink); pa_sink_unref(u->sink);
}
if (u->rtpoll)
if (u->rtpoll) {
pa_rtpoll_free(u->rtpoll); pa_rtpoll_free(u->rtpoll);
}
pa_xfree(u); pa_xfree(u);
} }

+ 3
- 2
pulse-notes.txt View File

@ -8,7 +8,7 @@ pulseaudio --version
To build xrdp pulse sink, To build xrdp pulse sink,
get the pulse source that most closely matches your version on get the pulse source that most closely matches your version on
your machine. Get the source from
your machine. Get the source from
http://freedesktop.org/software/pulseaudio/releases/ http://freedesktop.org/software/pulseaudio/releases/
run ./configure after extracting. I don't think you need to build it. run ./configure after extracting. I don't think you need to build it.
edit Makefile to point to your pulse source directory. edit Makefile to point to your pulse source directory.
@ -22,5 +22,6 @@ PA always respawning
Make sure these lines are uncommented, like this: Make sure these lines are uncommented, like this:
autospawn = no autospawn = no
daemon-binary = /bin/true
daemon-binary = /bin/true
xfreerdp -a 24 -z --plugin rdpsnd --data alsa:hw:0,0 -- 127.0.0.1

Loading…
Cancel
Save