Buffering: tin cup. Update threading structure and handle rebuffer more reliably on buffer thread using a single message send.

git-svn-id: svn://svn.rockbox.org/rockbox/trunk@29303 a1c6a512-1295-4272-9138-f99709370657
This commit is contained in:
Michael Sevakis 2011-02-14 08:36:29 +00:00
parent 0fde635fb0
commit 6938255b6b

View file

@ -96,11 +96,9 @@ struct memory_handle {
enum data_type type; /* Type of data buffered with this handle */ enum data_type type; /* Type of data buffered with this handle */
char path[MAX_PATH]; /* Path if data originated in a file */ char path[MAX_PATH]; /* Path if data originated in a file */
int fd; /* File descriptor to path (-1 if closed) */ int fd; /* File descriptor to path (-1 if closed) */
size_t start; /* Start index of the handle's data buffer, size_t data; /* Start index of the handle's data buffer */
for use by reset_handle. */
size_t data; /* Start index of the handle's data */
volatile size_t ridx; /* Read pointer, relative to the main buffer */ volatile size_t ridx; /* Read pointer, relative to the main buffer */
size_t widx; /* Write pointer */ size_t widx; /* Write pointer, relative to the main buffer */
size_t filesize; /* File total length */ size_t filesize; /* File total length */
size_t filerem; /* Remaining bytes of file NOT in buffer */ size_t filerem; /* Remaining bytes of file NOT in buffer */
volatile size_t available; /* Available bytes to read from buffer */ volatile size_t available; /* Available bytes to read from buffer */
@ -109,6 +107,13 @@ struct memory_handle {
}; };
/* invariant: filesize == offset + available + filerem */ /* invariant: filesize == offset + available + filerem */
struct buf_message_data
{
int handle_id;
intptr_t data;
};
static char *buffer; static char *buffer;
static char *guard_buffer; static char *guard_buffer;
@ -133,14 +138,14 @@ static int num_handles; /* number of handles in the list */
static int base_handle_id; static int base_handle_id;
static struct mutex llist_mutex; /* Main lock for adding / removing handles */
static struct mutex llist_mod_mutex; static struct mutex llist_mutex SHAREDBSS_ATTR;
/* Handle cache (makes find_handle faster). /* Handle cache (makes find_handle faster).
This is global so that move_handle and rm_handle can invalidate it. */ This is global so that move_handle and rm_handle can invalidate it. */
static struct memory_handle *cached_handle = NULL; static struct memory_handle *cached_handle = NULL;
static struct { static struct data_counters {
size_t remaining; /* Amount of data needing to be buffered */ size_t remaining; /* Amount of data needing to be buffered */
size_t wasted; /* Amount of space available for freeing */ size_t wasted; /* Amount of space available for freeing */
size_t buffered; /* Amount of data currently in the buffer */ size_t buffered; /* Amount of data currently in the buffer */
@ -152,8 +157,8 @@ static struct {
enum { enum {
Q_BUFFER_HANDLE = 1, /* Request buffering of a handle, this should not be Q_BUFFER_HANDLE = 1, /* Request buffering of a handle, this should not be
used in a low buffer situation. */ used in a low buffer situation. */
Q_RESET_HANDLE, /* (internal) Request resetting of a handle to its Q_REBUFFER_HANDLE, /* Request reset and rebuffering of a handle at a new
offset (the offset has to be set beforehand) */ file starting position. */
Q_CLOSE_HANDLE, /* Request closing a handle */ Q_CLOSE_HANDLE, /* Request closing a handle */
Q_BASE_HANDLE, /* Set the reference handle for buf_useful_data */ Q_BASE_HANDLE, /* Set the reference handle for buf_useful_data */
@ -257,9 +262,6 @@ static struct memory_handle *add_handle(size_t data_size, bool can_wrap,
if (num_handles >= BUF_MAX_HANDLES) if (num_handles >= BUF_MAX_HANDLES)
return NULL; return NULL;
mutex_lock(&llist_mutex);
mutex_lock(&llist_mod_mutex);
widx = buf_widx; widx = buf_widx;
if (cur_handle && cur_handle->filerem > 0) { if (cur_handle && cur_handle->filerem > 0) {
@ -269,8 +271,6 @@ static struct memory_handle *add_handle(size_t data_size, bool can_wrap,
size_t req = cur_handle->filerem; size_t req = cur_handle->filerem;
if (ringbuf_add_cross(cur_handle->widx, req, buf_ridx) >= 0) { if (ringbuf_add_cross(cur_handle->widx, req, buf_ridx) >= 0) {
/* Not enough space to finish allocation */ /* Not enough space to finish allocation */
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return NULL; return NULL;
} else { } else {
/* Allocate the remainder of the space for the current handle */ /* Allocate the remainder of the space for the current handle */
@ -298,8 +298,6 @@ static struct memory_handle *add_handle(size_t data_size, bool can_wrap,
overlap = ringbuf_add_cross(widx, shift + len, buf_ridx); overlap = ringbuf_add_cross(widx, shift + len, buf_ridx);
if (overlap >= 0 && (alloc_all || (size_t)overlap >= data_size)) { if (overlap >= 0 && (alloc_all || (size_t)overlap >= data_size)) {
/* Not enough space for required allocations */ /* Not enough space for required allocations */
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return NULL; return NULL;
} }
@ -310,6 +308,9 @@ static struct memory_handle *add_handle(size_t data_size, bool can_wrap,
struct memory_handle *new_handle = struct memory_handle *new_handle =
(struct memory_handle *)(&buffer[buf_widx]); (struct memory_handle *)(&buffer[buf_widx]);
/* Prevent buffering thread from looking at it */
new_handle->filerem = 0;
/* only advance the buffer write index of the size of the struct */ /* only advance the buffer write index of the size of the struct */
buf_widx = ringbuf_add(buf_widx, sizeof(struct memory_handle)); buf_widx = ringbuf_add(buf_widx, sizeof(struct memory_handle));
@ -328,8 +329,6 @@ static struct memory_handle *add_handle(size_t data_size, bool can_wrap,
cur_handle = new_handle; cur_handle = new_handle;
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return new_handle; return new_handle;
} }
@ -340,9 +339,6 @@ static bool rm_handle(const struct memory_handle *h)
if (h == NULL) if (h == NULL)
return true; return true;
mutex_lock(&llist_mutex);
mutex_lock(&llist_mod_mutex);
if (h == first_handle) { if (h == first_handle) {
first_handle = h->next; first_handle = h->next;
if (h == cur_handle) { if (h == cur_handle) {
@ -366,8 +362,6 @@ static bool rm_handle(const struct memory_handle *h)
buf_widx = cur_handle->widx; buf_widx = cur_handle->widx;
} }
} else { } else {
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return false; return false;
} }
} }
@ -377,9 +371,6 @@ static bool rm_handle(const struct memory_handle *h)
cached_handle = NULL; cached_handle = NULL;
num_handles--; num_handles--;
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return true; return true;
} }
@ -390,19 +381,15 @@ static struct memory_handle *find_handle(int handle_id)
if (handle_id < 0) if (handle_id < 0)
return NULL; return NULL;
mutex_lock(&llist_mutex);
/* simple caching because most of the time the requested handle /* simple caching because most of the time the requested handle
will either be the same as the last, or the one after the last */ will either be the same as the last, or the one after the last */
if (cached_handle) if (cached_handle)
{ {
if (cached_handle->id == handle_id) { if (cached_handle->id == handle_id) {
mutex_unlock(&llist_mutex);
return cached_handle; return cached_handle;
} else if (cached_handle->next && } else if (cached_handle->next &&
(cached_handle->next->id == handle_id)) { (cached_handle->next->id == handle_id)) {
cached_handle = cached_handle->next; cached_handle = cached_handle->next;
mutex_unlock(&llist_mutex);
return cached_handle; return cached_handle;
} }
} }
@ -415,7 +402,6 @@ static struct memory_handle *find_handle(int handle_id)
if (m) if (m)
cached_handle = m; cached_handle = m;
mutex_unlock(&llist_mutex);
return m; return m;
} }
@ -449,9 +435,6 @@ static bool move_handle(struct memory_handle **h, size_t *delta,
return false; return false;
} }
mutex_lock(&llist_mutex);
mutex_lock(&llist_mod_mutex);
oldpos = ringbuf_offset(src); oldpos = ringbuf_offset(src);
newpos = ringbuf_add(oldpos, final_delta); newpos = ringbuf_add(oldpos, final_delta);
overlap = ringbuf_add_cross(newpos, size_to_move, buffer_len); overlap = ringbuf_add_cross(newpos, size_to_move, buffer_len);
@ -477,8 +460,6 @@ static bool move_handle(struct memory_handle **h, size_t *delta,
correction = (correction + 3) & ~3; correction = (correction + 3) & ~3;
if (final_delta < correction + sizeof(struct memory_handle)) { if (final_delta < correction + sizeof(struct memory_handle)) {
/* Delta cannot end up less than the size of the struct */ /* Delta cannot end up less than the size of the struct */
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return false; return false;
} }
newpos -= correction; newpos -= correction;
@ -500,8 +481,6 @@ static bool move_handle(struct memory_handle **h, size_t *delta,
if (m && m->next == src) { if (m && m->next == src) {
m->next = dest; m->next = dest;
} else { } else {
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return false; return false;
} }
} }
@ -562,8 +541,6 @@ static bool move_handle(struct memory_handle **h, size_t *delta,
/* Update the caller with the new location of h and the distance moved */ /* Update the caller with the new location of h and the distance moved */
*h = dest; *h = dest;
*delta = final_delta; *delta = final_delta;
mutex_unlock(&llist_mod_mutex);
mutex_unlock(&llist_mutex);
return true; return true;
} }
@ -575,7 +552,6 @@ BUFFER SPACE MANAGEMENT
update_data_counters: Updates the values in data_counters update_data_counters: Updates the values in data_counters
buffer_is_low : Returns true if the amount of useful data in the buffer is low buffer_is_low : Returns true if the amount of useful data in the buffer is low
buffer_handle : Buffer data for a handle buffer_handle : Buffer data for a handle
reset_handle : Reset write position and data buffer of a handle to its offset
rebuffer_handle : Seek to a nonbuffered part of a handle by rebuffering the data rebuffer_handle : Seek to a nonbuffered part of a handle by rebuffering the data
shrink_handle : Free buffer space by moving a handle shrink_handle : Free buffer space by moving a handle
fill_buffer : Call buffer_handle for all handles that have data to buffer fill_buffer : Call buffer_handle for all handles that have data to buffer
@ -583,18 +559,24 @@ fill_buffer : Call buffer_handle for all handles that have data to buffer
These functions are used by the buffering thread to manage buffer space. These functions are used by the buffering thread to manage buffer space.
*/ */
static void update_data_counters(void) static void update_data_counters(struct data_counters *dc)
{ {
struct memory_handle *m = find_handle(base_handle_id);
bool is_useful = m==NULL;
size_t buffered = 0; size_t buffered = 0;
size_t wasted = 0; size_t wasted = 0;
size_t remaining = 0; size_t remaining = 0;
size_t useful = 0; size_t useful = 0;
struct memory_handle *m;
bool is_useful;
if (dc == NULL)
dc = &data_counters;
mutex_lock(&llist_mutex); mutex_lock(&llist_mutex);
m = find_handle(base_handle_id);
is_useful = m == NULL;
m = first_handle; m = first_handle;
while (m) { while (m) {
buffered += m->available; buffered += m->available;
@ -612,21 +594,21 @@ static void update_data_counters(void)
mutex_unlock(&llist_mutex); mutex_unlock(&llist_mutex);
data_counters.buffered = buffered; dc->buffered = buffered;
data_counters.wasted = wasted; dc->wasted = wasted;
data_counters.remaining = remaining; dc->remaining = remaining;
data_counters.useful = useful; dc->useful = useful;
} }
static inline bool buffer_is_low(void) static inline bool buffer_is_low(void)
{ {
update_data_counters(); update_data_counters(NULL);
return data_counters.useful < (conf_watermark / 2); return data_counters.useful < (conf_watermark / 2);
} }
/* Buffer data for the given handle. /* Q_BUFFER_HANDLE event and buffer data for the given handle.
Return whether or not the buffering should continue explicitly. */ Return whether or not the buffering should continue explicitly. */
static bool buffer_handle(int handle_id) static bool buffer_handle(int handle_id, size_t to_buffer)
{ {
logf("buffer_handle(%d)", handle_id); logf("buffer_handle(%d)", handle_id);
struct memory_handle *h = find_handle(handle_id); struct memory_handle *h = find_handle(handle_id);
@ -671,7 +653,7 @@ static bool buffer_handle(int handle_id)
h->filerem = 0; h->filerem = 0;
h->available = sizeof(struct mp3entry); h->available = sizeof(struct mp3entry);
h->widx += sizeof(struct mp3entry); h->widx += sizeof(struct mp3entry);
send_event(BUFFER_EVENT_FINISHED, &h->id); send_event(BUFFER_EVENT_FINISHED, &handle_id);
return true; return true;
} }
@ -730,113 +712,53 @@ static bool buffer_handle(int handle_id)
yield(); yield();
} }
if (!queue_empty(&buffering_queue)) if (to_buffer == 0)
break; {
/* Normal buffering - check queue */
if(!queue_empty(&buffering_queue))
break;
}
else
{
if (to_buffer <= (size_t)rc)
break; /* Done */
to_buffer -= rc;
}
} }
if (h->filerem == 0) { if (h->filerem == 0) {
/* finished buffering the file */ /* finished buffering the file */
close(h->fd); close(h->fd);
h->fd = -1; h->fd = -1;
send_event(BUFFER_EVENT_FINISHED, &h->id); send_event(BUFFER_EVENT_FINISHED, &handle_id);
} }
return !stop; return !stop;
} }
/* Reset writing position and data buffer of a handle to its current offset. /* Close the specified handle id and free its allocation. */
Use this after having set the new offset to use. */
static void reset_handle(int handle_id)
{
size_t new_index;
logf("reset_handle(%d)", handle_id);
struct memory_handle *h = find_handle(handle_id);
if (!h)
return;
new_index = h->start;
#ifdef STORAGE_WANTS_ALIGN
/* Align to desired storage alignment if space permits - handle could have
been shrunken too close to the following one after a previous rebuffer. */
size_t alignment_pad = STORAGE_OVERLAP(h->offset - (size_t)(&buffer[new_index]));
size_t offset = h->next ? ringbuf_offset(h->next) : buf_ridx;
if (ringbuf_add_cross(new_index, alignment_pad, offset) >= 0) {
/* Forego storage alignment this time */
alignment_pad = 0;
}
new_index = ringbuf_add(new_index, alignment_pad);
#endif
h->ridx = h->widx = h->data = new_index;
if (h == cur_handle)
buf_widx = new_index;
h->available = 0;
h->filerem = h->filesize - h->offset;
if (h->fd >= 0) {
lseek(h->fd, h->offset, SEEK_SET);
}
}
/* Seek to a nonbuffered part of a handle by rebuffering the data. */
static void rebuffer_handle(int handle_id, size_t newpos)
{
struct memory_handle *h = find_handle(handle_id);
if (!h)
return;
/* When seeking foward off of the buffer, if it is a short seek don't
rebuffer the whole track, just read enough to satisfy */
if (newpos > h->offset && newpos - h->offset < BUFFERING_DEFAULT_FILECHUNK)
{
LOGFQUEUE("buffering >| Q_BUFFER_HANDLE %d", handle_id);
queue_send(&buffering_queue, Q_BUFFER_HANDLE, handle_id);
h->ridx = ringbuf_add(h->data, newpos - h->offset);
return;
}
h->offset = newpos;
/* Reset the handle to its new offset */
LOGFQUEUE("buffering >| Q_RESET_HANDLE %d", handle_id);
queue_send(&buffering_queue, Q_RESET_HANDLE, handle_id);
uintptr_t next = ringbuf_offset(h->next);
if (ringbuf_sub(next, h->data) < h->filesize - newpos)
{
/* There isn't enough space to rebuffer all of the track from its new
offset, so we ask the user to free some */
DEBUGF("%s(): space is needed\n", __func__);
send_event(BUFFER_EVENT_REBUFFER, &handle_id);
}
/* Now we ask for a rebuffer */
LOGFQUEUE("buffering >| Q_BUFFER_HANDLE %d", handle_id);
queue_send(&buffering_queue, Q_BUFFER_HANDLE, handle_id);
}
static bool close_handle(int handle_id) static bool close_handle(int handle_id)
{ {
struct memory_handle *h = find_handle(handle_id); bool retval = true;
struct memory_handle *h;
mutex_lock(&llist_mutex);
h = find_handle(handle_id);
/* If the handle is not found, it is closed */ /* If the handle is not found, it is closed */
if (!h) if (h) {
return true; if (h->fd >= 0) {
close(h->fd);
h->fd = -1;
}
if (h->fd >= 0) { /* rm_handle returns true unless the handle somehow persists after
close(h->fd); exit */
h->fd = -1; retval = rm_handle(h);
} }
/* rm_handle returns true unless the handle somehow persists after exit */ mutex_unlock(&llist_mutex);
return rm_handle(h); return retval;
} }
/* Free buffer space by moving the handle struct right before the useful /* Free buffer space by moving the handle struct right before the useful
@ -888,7 +810,6 @@ static void shrink_handle(struct memory_handle *h)
return; return;
h->data = ringbuf_add(h->data, delta); h->data = ringbuf_add(h->data, delta);
h->start = ringbuf_add(h->start, delta);
h->available -= delta; h->available -= delta;
h->offset += delta; h->offset += delta;
} }
@ -900,12 +821,13 @@ static void shrink_handle(struct memory_handle *h)
static bool fill_buffer(void) static bool fill_buffer(void)
{ {
logf("fill_buffer()"); logf("fill_buffer()");
struct memory_handle *m; struct memory_handle *m = first_handle;
shrink_handle(first_handle);
m = first_handle; shrink_handle(m);
while (queue_empty(&buffering_queue) && m) { while (queue_empty(&buffering_queue) && m) {
if (m->filerem > 0) { if (m->filerem > 0) {
if (!buffer_handle(m->id)) { if (!buffer_handle(m->id, 0)) {
m = NULL; m = NULL;
break; break;
} }
@ -990,7 +912,7 @@ management functions for all the actual handle management work.
/* Reserve space in the buffer for a file. /* Reserve space in the buffer for a file.
filename: name of the file to open filename: name of the file to open
offset: offset at which to start buffering the file, useful when the first offset: offset at which to start buffering the file, useful when the first
(offset-1) bytes of the file aren't needed. offset bytes of the file aren't needed.
type: one of the data types supported (audio, image, cuesheet, others type: one of the data types supported (audio, image, cuesheet, others
user_data: user data passed possibly passed in subcalls specific to a user_data: user data passed possibly passed in subcalls specific to a
data_type (only used for image (albumart) buffering so far ) data_type (only used for image (albumart) buffering so far )
@ -1004,33 +926,40 @@ int bufopen(const char *file, size_t offset, enum data_type type,
/* currently only used for aa loading */ /* currently only used for aa loading */
(void)user_data; (void)user_data;
#endif #endif
int handle_id = ERR_BUFFER_FULL;
/* No buffer refs until after the mutex_lock call! */
if (type == TYPE_ID3) if (type == TYPE_ID3)
{ {
/* ID3 case: allocate space, init the handle and return. */ /* ID3 case: allocate space, init the handle and return. */
mutex_lock(&llist_mutex);
struct memory_handle *h = add_handle(sizeof(struct mp3entry), false, true); struct memory_handle *h = add_handle(sizeof(struct mp3entry), false, true);
if (!h) if (h)
return ERR_BUFFER_FULL; {
handle_id = h->id;
h->fd = -1;
h->filesize = sizeof(struct mp3entry);
h->offset = 0;
h->data = buf_widx;
h->ridx = buf_widx;
h->widx = buf_widx;
h->available = 0;
h->type = type;
strlcpy(h->path, file, MAX_PATH);
h->fd = -1; buf_widx += sizeof(struct mp3entry); /* safe because the handle
h->filesize = sizeof(struct mp3entry); can't wrap */
h->filerem = sizeof(struct mp3entry); h->filerem = sizeof(struct mp3entry);
h->offset = 0;
h->data = buf_widx;
h->ridx = buf_widx;
h->widx = buf_widx;
h->available = 0;
h->type = type;
strlcpy(h->path, file, MAX_PATH);
buf_widx += sizeof(struct mp3entry); /* safe because the handle /* Inform the buffering thread that we added a handle */
can't wrap */ LOGFQUEUE("buffering > Q_HANDLE_ADDED %d", handle_id);
queue_post(&buffering_queue, Q_HANDLE_ADDED, handle_id);
}
/* Inform the buffering thread that we added a handle */ mutex_unlock(&llist_mutex);
LOGFQUEUE("buffering > Q_HANDLE_ADDED %d", h->id); return handle_id;
queue_post(&buffering_queue, Q_HANDLE_ADDED, h->id);
return h->id;
} }
#ifdef APPLICATION #ifdef APPLICATION
/* loading code from memory is not supported in application builds */ /* loading code from memory is not supported in application builds */
@ -1062,37 +991,40 @@ int bufopen(const char *file, size_t offset, enum data_type type,
/* Reserve extra space because alignment can move data forward */ /* Reserve extra space because alignment can move data forward */
size_t padded_size = STORAGE_PAD(size-adjusted_offset); size_t padded_size = STORAGE_PAD(size-adjusted_offset);
mutex_lock(&llist_mutex);
struct memory_handle *h = add_handle(padded_size, can_wrap, false); struct memory_handle *h = add_handle(padded_size, can_wrap, false);
if (!h) if (!h)
{ {
DEBUGF("%s(): failed to add handle\n", __func__); DEBUGF("%s(): failed to add handle\n", __func__);
mutex_unlock(&llist_mutex);
close(fd); close(fd);
return ERR_BUFFER_FULL; return ERR_BUFFER_FULL;
} }
handle_id = h->id;
strlcpy(h->path, file, MAX_PATH); strlcpy(h->path, file, MAX_PATH);
h->offset = adjusted_offset; h->offset = adjusted_offset;
#ifdef STORAGE_WANTS_ALIGN
/* Don't bother to storage align bitmaps because they are not /* Don't bother to storage align bitmaps because they are not
* loaded directly into the buffer. * loaded directly into the buffer.
*/ */
if (type != TYPE_BITMAP) if (type != TYPE_BITMAP)
{ {
size_t alignment_pad;
/* Remember where data area starts, for use by reset_handle */
h->start = buf_widx;
/* Align to desired storage alignment */ /* Align to desired storage alignment */
alignment_pad = STORAGE_OVERLAP(adjusted_offset - (size_t)(&buffer[buf_widx])); size_t alignment_pad = STORAGE_OVERLAP(adjusted_offset -
(size_t)(&buffer[buf_widx]));
buf_widx = ringbuf_add(buf_widx, alignment_pad); buf_widx = ringbuf_add(buf_widx, alignment_pad);
} }
#endif /* STORAGE_WANTS_ALIGN */
h->fd = -1;
h->data = buf_widx;
h->ridx = buf_widx; h->ridx = buf_widx;
h->widx = buf_widx; h->widx = buf_widx;
h->data = buf_widx;
h->available = 0; h->available = 0;
h->filerem = 0;
h->type = type; h->type = type;
#ifdef HAVE_ALBUMART #ifdef HAVE_ALBUMART
@ -1100,47 +1032,55 @@ int bufopen(const char *file, size_t offset, enum data_type type,
{ {
/* Bitmap file: we load the data instead of the file */ /* Bitmap file: we load the data instead of the file */
int rc; int rc;
mutex_lock(&llist_mod_mutex); /* Lock because load_bitmap yields */
rc = load_image(fd, file, (struct bufopen_bitmap_data*)user_data); rc = load_image(fd, file, (struct bufopen_bitmap_data*)user_data);
mutex_unlock(&llist_mod_mutex);
if (rc <= 0) if (rc <= 0)
{ {
rm_handle(h); rm_handle(h);
close(fd); handle_id = ERR_FILE_ERROR;
return ERR_FILE_ERROR; }
else
{
h->filesize = rc;
h->available = rc;
h->widx = buf_widx + rc; /* safe because the data doesn't wrap */
buf_widx += rc; /* safe too */
} }
h->filerem = 0;
h->filesize = rc;
h->available = rc;
h->widx = buf_widx + rc; /* safe because the data doesn't wrap */
buf_widx += rc; /* safe too */
} }
else else
#endif #endif
{ {
h->filerem = size - adjusted_offset; if (type == TYPE_CUESHEET)
h->fd = fd;
h->filesize = size; h->filesize = size;
h->available = 0; h->available = 0;
h->widx = buf_widx; h->widx = buf_widx;
h->filerem = size - adjusted_offset;
} }
if (type == TYPE_CUESHEET) { mutex_unlock(&llist_mutex);
h->fd = fd;
if (type == TYPE_CUESHEET)
{
/* Immediately start buffering those */ /* Immediately start buffering those */
LOGFQUEUE("buffering >| Q_BUFFER_HANDLE %d", h->id); LOGFQUEUE("buffering >| Q_BUFFER_HANDLE %d", handle_id);
queue_send(&buffering_queue, Q_BUFFER_HANDLE, h->id); queue_send(&buffering_queue, Q_BUFFER_HANDLE, handle_id);
} else { }
else
{
/* Other types will get buffered in the course of normal operations */ /* Other types will get buffered in the course of normal operations */
h->fd = -1;
close(fd); close(fd);
/* Inform the buffering thread that we added a handle */ if (handle_id >= 0)
LOGFQUEUE("buffering > Q_HANDLE_ADDED %d", h->id); {
queue_post(&buffering_queue, Q_HANDLE_ADDED, h->id); /* Inform the buffering thread that we added a handle */
LOGFQUEUE("buffering > Q_HANDLE_ADDED %d", handle_id);
queue_post(&buffering_queue, Q_HANDLE_ADDED, handle_id);
}
} }
logf("bufopen: new hdl %d", h->id); logf("bufopen: new hdl %d", handle_id);
return h->id; return handle_id;
} }
/* Open a new handle from data that needs to be copied from memory. /* Open a new handle from data that needs to be copied from memory.
@ -1152,36 +1092,43 @@ int bufopen(const char *file, size_t offset, enum data_type type,
*/ */
int bufalloc(const void *src, size_t size, enum data_type type) int bufalloc(const void *src, size_t size, enum data_type type)
{ {
int handle_id = ERR_BUFFER_FULL;
mutex_lock(&llist_mutex);
struct memory_handle *h = add_handle(size, false, true); struct memory_handle *h = add_handle(size, false, true);
if (!h) if (h)
return ERR_BUFFER_FULL; {
handle_id = h->id;
if (src) { if (src) {
if (type == TYPE_ID3 && size == sizeof(struct mp3entry)) { if (type == TYPE_ID3 && size == sizeof(struct mp3entry)) {
/* specially take care of struct mp3entry */ /* specially take care of struct mp3entry */
copy_mp3entry((struct mp3entry *)&buffer[buf_widx], copy_mp3entry((struct mp3entry *)&buffer[buf_widx],
(const struct mp3entry *)src); (const struct mp3entry *)src);
} else { } else {
memcpy(&buffer[buf_widx], src, size); memcpy(&buffer[buf_widx], src, size);
}
} }
h->fd = -1;
*h->path = 0;
h->filesize = size;
h->offset = 0;
h->ridx = buf_widx;
h->widx = buf_widx + size; /* this is safe because the data doesn't wrap */
h->data = buf_widx;
h->available = size;
h->type = type;
buf_widx += size; /* safe too */
} }
h->fd = -1; mutex_unlock(&llist_mutex);
*h->path = 0;
h->filesize = size;
h->filerem = 0;
h->offset = 0;
h->ridx = buf_widx;
h->widx = buf_widx + size; /* this is safe because the data doesn't wrap */
h->data = buf_widx;
h->available = size;
h->type = type;
buf_widx += size; /* safe too */ logf("bufalloc: new hdl %d", handle_id);
return handle_id;
logf("bufalloc: new hdl %d", h->id);
return h->id;
} }
/* Close the handle. Return true for success and false for failure */ /* Close the handle. Return true for success and false for failure */
@ -1193,6 +1140,102 @@ bool bufclose(int handle_id)
return queue_send(&buffering_queue, Q_CLOSE_HANDLE, handle_id); return queue_send(&buffering_queue, Q_CLOSE_HANDLE, handle_id);
} }
/* Backend to bufseek and bufadvance. Call only in response to
Q_REBUFFER_HANDLE! */
static void rebuffer_handle(int handle_id, size_t newpos)
{
struct memory_handle *h = find_handle(handle_id);
if (!h)
{
queue_reply(&buffering_queue, ERR_HANDLE_NOT_FOUND);
return;
}
/* When seeking foward off of the buffer, if it is a short seek attempt to
avoid rebuffering the whole track, just read enough to satisfy */
if (newpos > h->offset && newpos - h->offset < BUFFERING_DEFAULT_FILECHUNK)
{
size_t amount = newpos - h->offset;
h->ridx = ringbuf_add(h->data, amount);
if (buffer_handle(handle_id, amount + 1))
{
queue_reply(&buffering_queue, 0);
buffer_handle(handle_id, 0); /* Ok, try the rest */
return;
}
/* Data collision - must reset */
}
/* Reset the handle to its new position */
h->offset = newpos;
size_t next = h->next ? ringbuf_offset(h->next) : buf_ridx;
#ifdef STORAGE_WANTS_ALIGN
/* Strip alignment padding then redo */
size_t new_index = ringbuf_add(ringbuf_offset(h), sizeof (*h));
/* Align to desired storage alignment if space permits - handle could have
been shrunken too close to the following one after a previous rebuffer. */
size_t alignment_pad =
STORAGE_OVERLAP(h->offset - (size_t)(&buffer[new_index]));
if (ringbuf_add_cross(new_index, alignment_pad, next) >= 0)
alignment_pad = 0; /* Forego storage alignment this time */
new_index = ringbuf_add(new_index, alignment_pad);
#else
/* Just clear the data buffer */
size_t new_index = h->data;
#endif /* STORAGE_WANTS_ALIGN */
h->ridx = h->widx = h->data = new_index;
if (h == cur_handle)
buf_widx = new_index;
h->available = 0;
h->filerem = h->filesize - h->offset;
if (h->fd >= 0)
lseek(h->fd, h->offset, SEEK_SET);
if (h->next && ringbuf_sub(next, h->data) <= h->filesize - newpos)
{
/* There isn't enough space to rebuffer all of the track from its new
offset, so we ask the user to free some */
DEBUGF("%s(): space is needed\n", __func__);
int hid = handle_id;
send_event(BUFFER_EVENT_REBUFFER, &hid);
}
/* Now we do the rebuffer */
queue_reply(&buffering_queue, 0);
buffer_handle(handle_id, 0);
}
/* Backend to bufseek and bufadvance */
static int seek_handle(struct memory_handle *h, size_t newpos)
{
if (newpos > h->filesize) {
/* access beyond the end of the file */
return ERR_INVALID_VALUE;
}
else if ((newpos < h->offset || h->offset + h->available <= newpos) &&
(newpos < h->filesize || h->filerem > 0)) {
/* access before or after buffered data and not to end of file or file
is not buffered to the end-- a rebuffer is needed. */
struct buf_message_data parm = { h->id, newpos };
return queue_send(&buffering_queue, Q_REBUFFER_HANDLE,
(intptr_t)&parm);
}
else {
h->ridx = ringbuf_add(h->data, newpos - h->offset);
}
return 0;
}
/* Set reading index in handle (relatively to the start of the file). /* Set reading index in handle (relatively to the start of the file).
Access before the available data will trigger a rebuffer. Access before the available data will trigger a rebuffer.
Return 0 for success and < 0 for failure: Return 0 for success and < 0 for failure:
@ -1205,44 +1248,47 @@ int bufseek(int handle_id, size_t newpos)
if (!h) if (!h)
return ERR_HANDLE_NOT_FOUND; return ERR_HANDLE_NOT_FOUND;
if (newpos > h->filesize) { return seek_handle(h, newpos);
/* access beyond the end of the file */
return ERR_INVALID_VALUE;
}
else if (newpos < h->offset || h->offset + h->available < newpos) {
/* access before or after buffered data. A rebuffer is needed. */
rebuffer_handle(handle_id, newpos);
}
else {
h->ridx = ringbuf_add(h->data, newpos - h->offset);
}
return 0;
} }
/* Advance the reading index in a handle (relatively to its current position). /* Advance the reading index in a handle (relatively to its current position).
Return 0 for success and < 0 for failure */ Return 0 for success and < 0 for failure */
int bufadvance(int handle_id, off_t offset) int bufadvance(int handle_id, off_t offset)
{ {
const struct memory_handle *h = find_handle(handle_id); struct memory_handle *h = find_handle(handle_id);
if (!h) if (!h)
return ERR_HANDLE_NOT_FOUND; return ERR_HANDLE_NOT_FOUND;
size_t newpos = h->offset + ringbuf_sub(h->ridx, h->data) + offset; size_t newpos = h->offset + ringbuf_sub(h->ridx, h->data) + offset;
return bufseek(handle_id, newpos); return seek_handle(h, newpos);
} }
/* Used by bufread and bufgetdata to prepare the buffer and retrieve the /* Used by bufread and bufgetdata to prepare the buffer and retrieve the
* actual amount of data available for reading. This function explicitly * actual amount of data available for reading. This function explicitly
* does not check the validity of the input handle. It does do range checks * does not check the validity of the input handle. It does do range checks
* on size and returns a valid (and explicit) amount of data for reading */ * on size and returns a valid (and explicit) amount of data for reading */
static size_t handle_size_available(const struct memory_handle *h)
{
/* Obtain proper distances from data start */
size_t rd = ringbuf_sub(h->ridx, h->data);
size_t wr = ringbuf_sub(h->widx, h->data);
if (LIKELY(wr > rd))
return wr - rd;
return 0; /* ridx is ahead of or equal to widx at this time */
}
static struct memory_handle *prep_bufdata(int handle_id, size_t *size, static struct memory_handle *prep_bufdata(int handle_id, size_t *size,
bool guardbuf_limit) bool guardbuf_limit)
{ {
struct memory_handle *h = find_handle(handle_id); struct memory_handle *h = find_handle(handle_id);
size_t realsize;
if (!h) if (!h)
return NULL; return NULL;
size_t avail = ringbuf_sub(h->widx, h->ridx); size_t avail = handle_size_available(h);
if (avail == 0 && h->filerem == 0) if (avail == 0 && h->filerem == 0)
{ {
@ -1251,40 +1297,52 @@ static struct memory_handle *prep_bufdata(int handle_id, size_t *size,
return h; return h;
} }
if (*size == 0 || *size > avail + h->filerem) realsize = *size;
*size = avail + h->filerem;
if (guardbuf_limit && h->type == TYPE_PACKET_AUDIO && *size > GUARD_BUFSIZE) if (realsize == 0 || realsize > avail + h->filerem)
realsize = avail + h->filerem;
if (guardbuf_limit && h->type == TYPE_PACKET_AUDIO
&& realsize > GUARD_BUFSIZE)
{ {
logf("data request > guardbuf"); logf("data request > guardbuf");
/* If more than the size of the guardbuf is requested and this is a /* If more than the size of the guardbuf is requested and this is a
* bufgetdata, limit to guard_bufsize over the end of the buffer */ * bufgetdata, limit to guard_bufsize over the end of the buffer */
*size = MIN(*size, buffer_len - h->ridx + GUARD_BUFSIZE); realsize = MIN(realsize, buffer_len - h->ridx + GUARD_BUFSIZE);
/* this ensures *size <= buffer_len - h->ridx + GUARD_BUFSIZE */ /* this ensures *size <= buffer_len - h->ridx + GUARD_BUFSIZE */
} }
if (h->filerem > 0 && avail < *size) if (h->filerem > 0 && avail < realsize)
{ {
/* Data isn't ready. Request buffering */ /* Data isn't ready. Request buffering */
buf_request_buffer_handle(handle_id); buf_request_buffer_handle(handle_id);
/* Wait for the data to be ready */ /* Wait for the data to be ready */
do do
{ {
sleep(1); sleep(0);
/* it is not safe for a non-buffering thread to sleep while /* it is not safe for a non-buffering thread to sleep while
* holding a handle */ * holding a handle */
h = find_handle(handle_id); h = find_handle(handle_id);
if (!h) if (!h)
return NULL; return NULL;
avail = ringbuf_sub(h->widx, h->ridx); avail = handle_size_available(h);
} }
while (h->filerem > 0 && avail < *size); while (h->filerem > 0 && avail < realsize);
} }
*size = MIN(*size,avail); *size = MIN(realsize, avail);
return h; return h;
} }
/* Note: It is safe for the thread responsible for handling the rebuffer
* cleanup request to call bufread or bufgetdata only when the data will
* be available-- not if it could be blocked waiting for it in prep_bufdata.
* It should be apparent that if said thread is being forced to wait for
* buffering but has not yet responded to the cleanup request, the space
* can never be cleared to allow further reading of the file because it is
* not listening to callbacks any longer. */
/* Copy data from the given handle to the dest buffer. /* Copy data from the given handle to the dest buffer.
Return the number of bytes copied or < 0 for failure (handle not found). Return the number of bytes copied or < 0 for failure (handle not found).
The caller is blocked until the requested amount of data is available. The caller is blocked until the requested amount of data is available.
@ -1481,6 +1539,7 @@ void buffering_thread(void)
{ {
bool filling = false; bool filling = false;
struct queue_event ev; struct queue_event ev;
struct buf_message_data *parm;
while (true) while (true)
{ {
@ -1499,19 +1558,20 @@ void buffering_thread(void)
send_event(BUFFER_EVENT_BUFFER_LOW, 0); send_event(BUFFER_EVENT_BUFFER_LOW, 0);
shrink_buffer(); shrink_buffer();
queue_reply(&buffering_queue, 1); queue_reply(&buffering_queue, 1);
filling |= buffer_handle((int)ev.data); filling |= buffer_handle((int)ev.data, 0);
break; break;
case Q_BUFFER_HANDLE: case Q_BUFFER_HANDLE:
LOGFQUEUE("buffering < Q_BUFFER_HANDLE %d", (int)ev.data); LOGFQUEUE("buffering < Q_BUFFER_HANDLE %d", (int)ev.data);
queue_reply(&buffering_queue, 1); queue_reply(&buffering_queue, 1);
buffer_handle((int)ev.data); buffer_handle((int)ev.data, 0);
break; break;
case Q_RESET_HANDLE: case Q_REBUFFER_HANDLE:
LOGFQUEUE("buffering < Q_RESET_HANDLE %d", (int)ev.data); parm = (struct buf_message_data *)ev.data;
queue_reply(&buffering_queue, 1); LOGFQUEUE("buffering < Q_REBUFFER_HANDLE %d %ld",
reset_handle((int)ev.data); parm->handle_id, parm->data);
rebuffer_handle(parm->handle_id, parm->data);
break; break;
case Q_CLOSE_HANDLE: case Q_CLOSE_HANDLE:
@ -1543,7 +1603,7 @@ void buffering_thread(void)
break; break;
} }
update_data_counters(); update_data_counters(NULL);
/* If the buffer is low, call the callbacks to get new data */ /* If the buffer is low, call the callbacks to get new data */
if (num_handles > 0 && data_counters.useful <= conf_watermark) if (num_handles > 0 && data_counters.useful <= conf_watermark)
@ -1565,7 +1625,7 @@ void buffering_thread(void)
if (!filling) if (!filling)
shrink_buffer(); shrink_buffer();
filling = fill_buffer(); filling = fill_buffer();
update_data_counters(); update_data_counters(NULL);
} }
} }
#endif #endif
@ -1593,12 +1653,6 @@ void buffering_thread(void)
void buffering_init(void) void buffering_init(void)
{ {
mutex_init(&llist_mutex); mutex_init(&llist_mutex);
mutex_init(&llist_mod_mutex);
#ifdef HAVE_PRIORITY_SCHEDULING
/* This behavior not safe atm */
mutex_set_preempt(&llist_mutex, false);
mutex_set_preempt(&llist_mod_mutex, false);
#endif
conf_watermark = BUFFERING_DEFAULT_WATERMARK; conf_watermark = BUFFERING_DEFAULT_WATERMARK;
@ -1648,11 +1702,12 @@ bool buffering_reset(char *buf, size_t buflen)
void buffering_get_debugdata(struct buffering_debug *dbgdata) void buffering_get_debugdata(struct buffering_debug *dbgdata)
{ {
update_data_counters(); struct data_counters dc;
update_data_counters(&dc);
dbgdata->num_handles = num_handles; dbgdata->num_handles = num_handles;
dbgdata->data_rem = data_counters.remaining; dbgdata->data_rem = dc.remaining;
dbgdata->wasted_space = data_counters.wasted; dbgdata->wasted_space = dc.wasted;
dbgdata->buffered_data = data_counters.buffered; dbgdata->buffered_data = dc.buffered;
dbgdata->useful_data = data_counters.useful; dbgdata->useful_data = dc.useful;
dbgdata->watermark = conf_watermark; dbgdata->watermark = conf_watermark;
} }