diff options
Diffstat (limited to 'src/input/AsyncInputStream.cxx')
-rw-r--r-- | src/input/AsyncInputStream.cxx | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index 772a87df0..72decc465 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -19,6 +19,7 @@ #include "config.h" #include "AsyncInputStream.hxx" +#include "CondHandler.hxx" #include "tag/Tag.hxx" #include "thread/Cond.hxx" #include "event/Loop.hxx" @@ -29,10 +30,10 @@ #include <string.h> AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url, - Mutex &_mutex, Cond &_cond, + Mutex &_mutex, size_t _buffer_size, size_t _resume_at) - :InputStream(_url, _mutex, _cond), + :InputStream(_url, _mutex), deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)), deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)), allocation(_buffer_size), @@ -133,8 +134,10 @@ AsyncInputStream::Seek(offset_type new_offset) deferred_seek.Schedule(); + CondInputStreamHandler cond_handler; + const ScopeExchangeInputStreamHandler h(*this, &cond_handler); while (seek_state != SeekState::NONE) - cond.wait(mutex); + cond_handler.cond.wait(mutex); Check(); } @@ -151,7 +154,7 @@ AsyncInputStream::SeekDone() noexcept open = true; seek_state = SeekState::NONE; - cond.broadcast(); + InvokeOnAvailable(); } std::unique_ptr<Tag> @@ -173,6 +176,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size) { assert(!GetEventLoop().IsInside()); + CondInputStreamHandler cond_handler; + /* wait for data */ CircularBuffer<uint8_t>::Range r; while (true) { @@ -182,7 +187,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size) if (!r.empty() || IsEOF()) break; - cond.wait(mutex); + const ScopeExchangeInputStreamHandler h(*this, &cond_handler); + cond_handler.cond.wait(mutex); } const size_t nbytes = std::min(read_size, r.size); @@ -205,7 +211,7 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept if (!IsReady()) SetReady(); else - cond.broadcast(); + InvokeOnAvailable(); } void @@ -231,7 +237,7 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept if (!IsReady()) SetReady(); else - cond.broadcast(); + InvokeOnAvailable(); } void @@ -243,7 +249,7 @@ AsyncInputStream::DeferredResume() noexcept Resume(); } catch (...) { postponed_exception = std::current_exception(); - cond.broadcast(); + InvokeOnAvailable(); } } @@ -265,6 +271,6 @@ AsyncInputStream::DeferredSeek() noexcept } catch (...) { seek_state = SeekState::NONE; postponed_exception = std::current_exception(); - cond.broadcast(); + InvokeOnAvailable(); } } |