shithub: dav1d

Download patch

ref: 3f410fd9030e9bd1f753ca4718f0210570751787
parent: c1b0808c4035bd0d62d7bfd66d0befe220ec1e52
author: Ronald S. Bultje <rsbultje@gmail.com>
date: Sat Nov 24 13:20:39 EST 2018

Rewrite flushing logic

The old flushing logic would simply leave frame threads (and tile
threads) running without caring how much latency that might impose
in the post-seek time-to-first-frame. This commit adds a 'flush'
state that will abort all running frame/tile threads from decoding
their current frame, as well as dispose of all frames in the output
queue.

Then, we use dav1d_flush() in dav1d_close() to abort running threads
on exit, instead of signaling their respective dependents to prevent
deadlocks. The advantage of this approach is that we don't signal on
objects we don't have ownership over, and thus this should prevent
race conditions where the owning thread could dispose of the object
just as we're signaling it, which I believe is what causes #193.

--- a/src/decode.c
+++ b/src/decode.c
@@ -2368,6 +2368,8 @@
              t->a = f->a + col_sb128_start + tile_row * f->sb128w;
              t->bx < ts->tiling.col_end; t->bx += sb_step)
         {
+            if (atomic_load_explicit(&t->tile_thread.flush, memory_order_acquire))
+                return 1;
             if (decode_sb(t, root_bl, c->intra_edge.root[root_bl]))
                 return 1;
             if (t->bx & 16 || f->seq_hdr.sb128)
@@ -2397,6 +2399,8 @@
          t->lf_mask = f->lf.mask + sb128y * f->sb128w + col_sb128_start;
          t->bx < ts->tiling.col_end; t->bx += sb_step)
     {
+        if (atomic_load_explicit(&t->tile_thread.flush, memory_order_acquire))
+            return 1;
         if (root_bl == BL_128X128) {
             t->cur_sb_cdef_idx_ptr = t->lf_mask->cdef_idx;
             t->cur_sb_cdef_idx_ptr[0] = -1;
@@ -2974,7 +2978,7 @@
                               &f->frame_thread.td.lock);
         out_delayed = &c->frame_thread.out_delayed[next];
         if (out_delayed->p.data[0]) {
-            if (out_delayed->visible && !out_delayed->flushed)
+            if (out_delayed->visible)
                 dav1d_picture_ref(&c->out, &out_delayed->p);
             dav1d_thread_picture_unref(out_delayed);
         }
--- a/src/internal.h
+++ b/src/internal.h
@@ -291,6 +291,7 @@
         struct thread_data td;
         struct FrameTileThreadData *fttd;
         int die;
+        atomic_int flush;
     } tile_thread;
 };
 
--- a/src/lib.c
+++ b/src/lib.c
@@ -134,6 +134,7 @@
                 t->tile_thread.fttd = &f->tile_thread;
                 pthread_create(&t->tile_thread.td.thread, NULL, dav1d_tile_task, t);
             }
+            atomic_init(&t->tile_thread.flush, 0);
         }
         f->libaom_cm = av1_alloc_ref_mv_common();
         if (!f->libaom_cm) goto error;
@@ -254,11 +255,8 @@
             if (out_delayed->p.data[0]) {
                 const unsigned progress = atomic_load_explicit(&out_delayed->progress[1],
                                                                memory_order_relaxed);
-                if (out_delayed->visible && !out_delayed->flushed &&
-                    progress != FRAME_ERROR)
-                {
+                if (out_delayed->visible && progress != FRAME_ERROR)
                     dav1d_picture_ref(&c->out, &out_delayed->p);
-                }
                 dav1d_thread_picture_unref(out_delayed);
                 if (c->out.data[0])
                     return output_image(c, out, &c->out);
@@ -292,8 +290,32 @@
 
     if (c->n_fc == 1) return;
 
-    for (unsigned n = 0; n < c->n_fc; n++)
-        c->frame_thread.out_delayed[n].flushed = 1;
+    // mark each currently-running frame as flushing, so that we
+    // exit out as quickly as the running thread checks this flag
+    for (unsigned n = 0; n < c->n_fc; n++) {
+        Dav1dFrameContext *const f = &c->fc[n];
+        for (int m = 0; m < f->n_tc; m++)
+            atomic_store(&f->tc[m].tile_thread.flush, 1);
+    }
+    for (unsigned n = 0, next = c->frame_thread.next; n < c->n_fc; n++, next++) {
+        if (next == c->n_fc) next = 0;
+        Dav1dFrameContext *const f = &c->fc[next];
+        pthread_mutex_lock(&f->frame_thread.td.lock);
+        if (f->n_tile_data > 0) {
+            while (f->n_tile_data > 0)
+                pthread_cond_wait(&f->frame_thread.td.cond,
+                                  &f->frame_thread.td.lock);
+            assert(!f->cur.data[0]);
+        }
+        pthread_mutex_unlock(&f->frame_thread.td.lock);
+        for (int m = 0; m < f->n_tc; m++)
+            atomic_store(&f->tc[m].tile_thread.flush, 0);
+        Dav1dThreadPicture *const out_delayed = &c->frame_thread.out_delayed[next];
+        if (out_delayed->p.data[0])
+            dav1d_thread_picture_unref(out_delayed);
+    }
+
+    c->frame_thread.next = 0;
 }
 
 void dav1d_close(Dav1dContext **const c_out) {
@@ -302,37 +324,17 @@
     Dav1dContext *const c = *c_out;
     if (!c) return;
 
+    dav1d_flush(c);
     for (unsigned n = 0; n < c->n_fc; n++) {
         Dav1dFrameContext *const f = &c->fc[n];
 
         // clean-up threading stuff
         if (c->n_fc > 1) {
-            if (f->frame_hdr.refresh_context)
-                dav1d_cdf_thread_signal(&f->out_cdf);
-            dav1d_thread_picture_signal(&f->sr_cur, FRAME_ERROR,
-                                        PLANE_TYPE_ALL);
             pthread_mutex_lock(&f->frame_thread.td.lock);
             f->frame_thread.die = 1;
             pthread_cond_signal(&f->frame_thread.td.cond);
             pthread_mutex_unlock(&f->frame_thread.td.lock);
             pthread_join(f->frame_thread.td.thread, NULL);
-            // free references from dav1d_submit_frame() usually freed by
-            // dav1d_decode_frame
-            for (int i = 0; i < 7; i++) {
-                if (f->refp[i].p.data[0])
-                    dav1d_thread_picture_unref(&f->refp[i]);
-                dav1d_ref_dec(&f->ref_mvs_ref[i]);
-            }
-            dav1d_picture_unref(&f->cur);
-            dav1d_thread_picture_unref(&f->sr_cur);
-            dav1d_cdf_thread_unref(&f->in_cdf);
-            if (f->frame_hdr.refresh_context)
-                dav1d_cdf_thread_unref(&f->out_cdf);
-            dav1d_ref_dec(&f->cur_segmap_ref);
-            dav1d_ref_dec(&f->prev_segmap_ref);
-            dav1d_ref_dec(&f->mvs_ref);
-            for (int i = 0; i < f->n_tile_data; i++)
-                dav1d_data_unref(&f->tile[i].data);
             freep(&f->frame_thread.b);
             dav1d_freep_aligned(&f->frame_thread.pal_idx);
             dav1d_freep_aligned(&f->frame_thread.cf);
--- a/src/obu.c
+++ b/src/obu.c
@@ -1339,17 +1339,13 @@
             if (out_delayed->p.data[0]) {
                 const unsigned progress = atomic_load_explicit(&out_delayed->progress[1],
                                                                memory_order_relaxed);
-                if (out_delayed->visible && !out_delayed->flushed &&
-                    progress != FRAME_ERROR)
-                {
+                if (out_delayed->visible && progress != FRAME_ERROR)
                     dav1d_picture_ref(&c->out, &out_delayed->p);
-                }
                 dav1d_thread_picture_unref(out_delayed);
             }
             dav1d_thread_picture_ref(out_delayed,
                                      &c->refs[c->frame_hdr.existing_frame_idx].p);
             out_delayed->visible = 1;
-            out_delayed->flushed = 0;
             out_delayed->p.m = in->m;
             pthread_mutex_unlock(&f->frame_thread.td.lock);
         }
--- a/src/picture.c
+++ b/src/picture.c
@@ -160,7 +160,6 @@
     if (res) return res;
 
     p->visible = visible;
-    p->flushed = 0;
     if (t) {
         atomic_init(&p->progress[0], 0);
         atomic_init(&p->progress[1], 0);
@@ -217,7 +216,6 @@
     dst->t = src->t;
     dst->visible = src->visible;
     dst->progress = src->progress;
-    dst->flushed = src->flushed;
 }
 
 void dav1d_picture_unref(Dav1dPicture *const p) {
@@ -273,8 +271,10 @@
         return;
 
     pthread_mutex_lock(&p->t->lock);
-    if (plane_type != PLANE_TYPE_Y) atomic_store(&p->progress[0], y);
-    if (plane_type != PLANE_TYPE_BLOCK) atomic_store(&p->progress[1], y);
+    if (plane_type != PLANE_TYPE_Y)
+        atomic_store(&p->progress[0], y);
+    if (plane_type != PLANE_TYPE_BLOCK)
+        atomic_store(&p->progress[1], y);
     pthread_cond_broadcast(&p->t->cond);
     pthread_mutex_unlock(&p->t->lock);
 }
--- a/src/picture.h
+++ b/src/picture.h
@@ -44,7 +44,7 @@
 
 typedef struct Dav1dThreadPicture {
     Dav1dPicture p;
-    int visible, flushed;
+    int visible;
     struct thread_data *t;
     // [0] block data (including segmentation map and motion vectors)
     // [1] pixel data