summaryrefslogtreecommitdiff
path: root/src/input/AsyncInputStream.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'src/input/AsyncInputStream.cxx')
-rw-r--r--src/input/AsyncInputStream.cxx24
1 files changed, 15 insertions, 9 deletions
diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx
index 772a87df0..72decc465 100644
--- a/src/input/AsyncInputStream.cxx
+++ b/src/input/AsyncInputStream.cxx
@@ -19,6 +19,7 @@
#include "config.h"
#include "AsyncInputStream.hxx"
+#include "CondHandler.hxx"
#include "tag/Tag.hxx"
#include "thread/Cond.hxx"
#include "event/Loop.hxx"
@@ -29,10 +30,10 @@
#include <string.h>
AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url,
- Mutex &_mutex, Cond &_cond,
+ Mutex &_mutex,
size_t _buffer_size,
size_t _resume_at)
- :InputStream(_url, _mutex, _cond),
+ :InputStream(_url, _mutex),
deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)),
deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)),
allocation(_buffer_size),
@@ -133,8 +134,10 @@ AsyncInputStream::Seek(offset_type new_offset)
deferred_seek.Schedule();
+ CondInputStreamHandler cond_handler;
+ const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
while (seek_state != SeekState::NONE)
- cond.wait(mutex);
+ cond_handler.cond.wait(mutex);
Check();
}
@@ -151,7 +154,7 @@ AsyncInputStream::SeekDone() noexcept
open = true;
seek_state = SeekState::NONE;
- cond.broadcast();
+ InvokeOnAvailable();
}
std::unique_ptr<Tag>
@@ -173,6 +176,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
{
assert(!GetEventLoop().IsInside());
+ CondInputStreamHandler cond_handler;
+
/* wait for data */
CircularBuffer<uint8_t>::Range r;
while (true) {
@@ -182,7 +187,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
if (!r.empty() || IsEOF())
break;
- cond.wait(mutex);
+ const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
+ cond_handler.cond.wait(mutex);
}
const size_t nbytes = std::min(read_size, r.size);
@@ -205,7 +211,7 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
if (!IsReady())
SetReady();
else
- cond.broadcast();
+ InvokeOnAvailable();
}
void
@@ -231,7 +237,7 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept
if (!IsReady())
SetReady();
else
- cond.broadcast();
+ InvokeOnAvailable();
}
void
@@ -243,7 +249,7 @@ AsyncInputStream::DeferredResume() noexcept
Resume();
} catch (...) {
postponed_exception = std::current_exception();
- cond.broadcast();
+ InvokeOnAvailable();
}
}
@@ -265,6 +271,6 @@ AsyncInputStream::DeferredSeek() noexcept
} catch (...) {
seek_state = SeekState::NONE;
postponed_exception = std::current_exception();
- cond.broadcast();
+ InvokeOnAvailable();
}
}