summaryrefslogtreecommitdiff
path: root/src/input
diff options
context:
space:
mode:
Diffstat (limited to 'src/input')
-rw-r--r--src/input/AsyncInputStream.cxx10
-rw-r--r--src/input/AsyncInputStream.hxx6
-rw-r--r--src/input/BufferedInputStream.cxx17
-rw-r--r--src/input/BufferedInputStream.hxx5
-rw-r--r--src/input/FailingInputStream.hxx4
-rw-r--r--src/input/IcyInputStream.cxx7
-rw-r--r--src/input/IcyInputStream.hxx3
-rw-r--r--src/input/InputStream.cxx22
-rw-r--r--src/input/InputStream.hxx20
-rw-r--r--src/input/ProxyInputStream.cxx14
-rw-r--r--src/input/ProxyInputStream.hxx6
-rw-r--r--src/input/RewindInputStream.cxx14
-rw-r--r--src/input/ThreadInputStream.cxx5
-rw-r--r--src/input/ThreadInputStream.hxx3
-rw-r--r--src/input/plugins/CdioParanoiaInputPlugin.cxx11
-rw-r--r--src/input/plugins/FfmpegInputPlugin.cxx11
-rw-r--r--src/input/plugins/FileInputPlugin.cxx12
-rw-r--r--src/input/plugins/SmbclientInputPlugin.cxx11
18 files changed, 109 insertions, 72 deletions
diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx
index ce78d166d..3ccd41353 100644
--- a/src/input/AsyncInputStream.cxx
+++ b/src/input/AsyncInputStream.cxx
@@ -95,7 +95,8 @@ AsyncInputStream::IsEOF() noexcept
}
void
-AsyncInputStream::Seek(offset_type new_offset)
+AsyncInputStream::Seek(std::unique_lock<Mutex> &lock,
+ offset_type new_offset)
{
assert(IsReady());
assert(seek_state == SeekState::NONE);
@@ -136,7 +137,7 @@ AsyncInputStream::Seek(offset_type new_offset)
CondInputStreamHandler cond_handler;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
while (seek_state != SeekState::NONE)
- cond_handler.cond.wait(mutex);
+ cond_handler.cond.wait(lock);
Check();
}
@@ -171,7 +172,8 @@ AsyncInputStream::IsAvailable() noexcept
}
size_t
-AsyncInputStream::Read(void *ptr, size_t read_size)
+AsyncInputStream::Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t read_size)
{
assert(!GetEventLoop().IsInside());
@@ -187,7 +189,7 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
break;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
- cond_handler.cond.wait(mutex);
+ cond_handler.cond.wait(lock);
}
const size_t nbytes = std::min(read_size, r.size);
diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx
index 958f5f690..2577cd9fa 100644
--- a/src/input/AsyncInputStream.hxx
+++ b/src/input/AsyncInputStream.hxx
@@ -83,10 +83,12 @@ public:
/* virtual methods from InputStream */
void Check() final;
bool IsEOF() noexcept final;
- void Seek(offset_type new_offset) final;
+ void Seek(std::unique_lock<Mutex> &lock,
+ offset_type new_offset) final;
std::unique_ptr<Tag> ReadTag() noexcept final;
bool IsAvailable() noexcept final;
- size_t Read(void *ptr, size_t read_size) final;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t read_size) final;
protected:
/**
diff --git a/src/input/BufferedInputStream.cxx b/src/input/BufferedInputStream.cxx
index d28964dbb..69a557a79 100644
--- a/src/input/BufferedInputStream.cxx
+++ b/src/input/BufferedInputStream.cxx
@@ -64,7 +64,8 @@ BufferedInputStream::Check()
}
void
-BufferedInputStream::Seek(offset_type new_offset)
+BufferedInputStream::Seek(std::unique_lock<Mutex> &lock,
+ offset_type new_offset)
{
if (new_offset >= size) {
offset = size;
@@ -84,7 +85,7 @@ BufferedInputStream::Seek(offset_type new_offset)
wake_cond.notify_one();
while (seek)
- client_cond.wait(mutex);
+ client_cond.wait(lock);
if (seek_error)
std::rethrow_exception(std::exchange(seek_error, {}));
@@ -105,7 +106,8 @@ BufferedInputStream::IsAvailable() noexcept
}
size_t
-BufferedInputStream::Read(void *ptr, size_t s)
+BufferedInputStream::Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t s)
{
if (offset >= size)
return 0;
@@ -140,7 +142,7 @@ BufferedInputStream::Read(void *ptr, size_t s)
wake_cond.notify_one();
}
- client_cond.wait(mutex);
+ client_cond.wait(lock);
}
}
@@ -156,7 +158,7 @@ BufferedInputStream::RunThread() noexcept
if (seek) {
try {
- input->Seek(seek_offset);
+ input->Seek(lock, seek_offset);
} catch (...) {
seek_error = std::current_exception();
}
@@ -183,7 +185,7 @@ BufferedInputStream::RunThread() noexcept
offset to prepare filling
the buffer from there */
try {
- input->Seek(offset);
+ input->Seek(lock, offset);
} catch (...) {
read_error = std::current_exception();
client_cond.notify_one();
@@ -195,7 +197,8 @@ BufferedInputStream::RunThread() noexcept
}
try {
- size_t nbytes = input->Read(w.data, w.size);
+ size_t nbytes = input->Read(lock,
+ w.data, w.size);
buffer.Commit(read_offset,
read_offset + nbytes);
} catch (...) {
diff --git a/src/input/BufferedInputStream.hxx b/src/input/BufferedInputStream.hxx
index 46a85a470..dcb6282a1 100644
--- a/src/input/BufferedInputStream.hxx
+++ b/src/input/BufferedInputStream.hxx
@@ -85,12 +85,13 @@ public:
/* we don't need to implement Update() because all attributes
have been copied already in our constructor */
//void Update() noexcept;
- void Seek(offset_type offset) override;
+ void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
bool IsEOF() noexcept override;
/* we don't support tags */
// std::unique_ptr<Tag> ReadTag() override;
bool IsAvailable() noexcept override;
- size_t Read(void *ptr, size_t size) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override;
/* virtual methods from class InputStreamHandler */
void OnInputStreamReady() noexcept override {
diff --git a/src/input/FailingInputStream.hxx b/src/input/FailingInputStream.hxx
index 008a1ceae..f150fd972 100644
--- a/src/input/FailingInputStream.hxx
+++ b/src/input/FailingInputStream.hxx
@@ -45,7 +45,7 @@ public:
std::rethrow_exception(error);
}
- void Seek(offset_type) override {
+ void Seek(std::unique_lock<Mutex> &, offset_type) override {
std::rethrow_exception(error);
}
@@ -53,7 +53,7 @@ public:
return false;
}
- size_t Read(void *, size_t) override {
+ size_t Read(std::unique_lock<Mutex> &, void *, size_t) override {
std::rethrow_exception(error);
}
};
diff --git a/src/input/IcyInputStream.cxx b/src/input/IcyInputStream.cxx
index 4614d8f81..3d85ea47a 100644
--- a/src/input/IcyInputStream.cxx
+++ b/src/input/IcyInputStream.cxx
@@ -80,13 +80,14 @@ IcyInputStream::ReadTag() noexcept
}
size_t
-IcyInputStream::Read(void *ptr, size_t read_size)
+IcyInputStream::Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t read_size)
{
if (!IsEnabled())
- return ProxyInputStream::Read(ptr, read_size);
+ return ProxyInputStream::Read(lock, ptr, read_size);
while (true) {
- size_t nbytes = ProxyInputStream::Read(ptr, read_size);
+ size_t nbytes = ProxyInputStream::Read(lock, ptr, read_size);
if (nbytes == 0)
return 0;
diff --git a/src/input/IcyInputStream.hxx b/src/input/IcyInputStream.hxx
index 5a6a69074..183cc46d7 100644
--- a/src/input/IcyInputStream.hxx
+++ b/src/input/IcyInputStream.hxx
@@ -66,7 +66,8 @@ public:
/* virtual methods from InputStream */
void Update() noexcept override;
std::unique_ptr<Tag> ReadTag() noexcept override;
- size_t Read(void *ptr, size_t size) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override;
};
#endif
diff --git a/src/input/InputStream.cxx b/src/input/InputStream.cxx
index e125a0409..f808fb5ee 100644
--- a/src/input/InputStream.cxx
+++ b/src/input/InputStream.cxx
@@ -72,7 +72,7 @@ InputStream::CheapSeeking() const noexcept
}
void
-InputStream::Seek(gcc_unused offset_type new_offset)
+InputStream::Seek(std::unique_lock<Mutex> &, gcc_unused offset_type new_offset)
{
throw std::runtime_error("Seeking is not implemented");
}
@@ -80,15 +80,15 @@ InputStream::Seek(gcc_unused offset_type new_offset)
void
InputStream::LockSeek(offset_type _offset)
{
- const std::lock_guard<Mutex> protect(mutex);
- Seek(_offset);
+ std::unique_lock<Mutex> lock(mutex);
+ Seek(lock, _offset);
}
void
InputStream::LockSkip(offset_type _offset)
{
- const std::lock_guard<Mutex> protect(mutex);
- Skip(_offset);
+ std::unique_lock<Mutex> lock(mutex);
+ Skip(lock, _offset);
}
std::unique_ptr<Tag>
@@ -119,18 +119,18 @@ InputStream::LockRead(void *ptr, size_t _size)
#endif
assert(_size > 0);
- const std::lock_guard<Mutex> protect(mutex);
- return Read(ptr, _size);
+ std::unique_lock<Mutex> lock(mutex);
+ return Read(lock, ptr, _size);
}
void
-InputStream::ReadFull(void *_ptr, size_t _size)
+InputStream::ReadFull(std::unique_lock<Mutex> &lock, void *_ptr, size_t _size)
{
uint8_t *ptr = (uint8_t *)_ptr;
size_t nbytes_total = 0;
while (_size > 0) {
- size_t nbytes = Read(ptr + nbytes_total, _size);
+ size_t nbytes = Read(lock, ptr + nbytes_total, _size);
if (nbytes == 0)
throw std::runtime_error("Unexpected end of file");
@@ -148,8 +148,8 @@ InputStream::LockReadFull(void *ptr, size_t _size)
#endif
assert(_size > 0);
- const std::lock_guard<Mutex> protect(mutex);
- ReadFull(ptr, _size);
+ std::unique_lock<Mutex> lock(mutex);
+ ReadFull(lock, ptr, _size);
}
bool
diff --git a/src/input/InputStream.hxx b/src/input/InputStream.hxx
index ba8156d6e..7b19a1e0b 100644
--- a/src/input/InputStream.hxx
+++ b/src/input/InputStream.hxx
@@ -271,9 +271,11 @@ public:
*
* Throws std::runtime_error on error.
*
+ * @param lock the locked mutex; may be used to wait on
+ * condition variables
* @param offset the relative offset
*/
- virtual void Seek(offset_type offset);
+ virtual void Seek(std::unique_lock<Mutex> &lock, offset_type offset);
/**
* Wrapper for Seek() which locks and unlocks the mutex; the
@@ -285,8 +287,8 @@ public:
* Rewind to the beginning of the stream. This is a wrapper
* for Seek(0, error).
*/
- void Rewind() {
- Seek(0);
+ void Rewind(std::unique_lock<Mutex> &lock) {
+ Seek(lock, 0);
}
void LockRewind() {
@@ -296,8 +298,9 @@ public:
/**
* Skip input bytes.
*/
- void Skip(offset_type _offset) {
- Seek(GetOffset() + _offset);
+ void Skip(std::unique_lock<Mutex> &lock,
+ offset_type _offset) {
+ Seek(lock, GetOffset() + _offset);
}
void LockSkip(offset_type _offset);
@@ -351,12 +354,15 @@ public:
*
* Throws std::runtime_error on error.
*
+ * @param lock the locked mutex; may be used to wait on
+ * condition variables
* @param ptr the buffer to read into
* @param size the maximum number of bytes to read
* @return the number of bytes read
*/
gcc_nonnull_all
- virtual size_t Read(void *ptr, size_t size) = 0;
+ virtual size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) = 0;
/**
* Wrapper for Read() which locks and unlocks the mutex;
@@ -379,7 +385,7 @@ public:
* @return true if the whole data was read, false otherwise.
*/
gcc_nonnull_all
- void ReadFull(void *ptr, size_t size);
+ void ReadFull(std::unique_lock<Mutex> &lock, void *ptr, size_t size);
/**
* Wrapper for ReadFull() which locks and unlocks the mutex;
diff --git a/src/input/ProxyInputStream.cxx b/src/input/ProxyInputStream.cxx
index f05f439f7..345e6f319 100644
--- a/src/input/ProxyInputStream.cxx
+++ b/src/input/ProxyInputStream.cxx
@@ -89,12 +89,13 @@ ProxyInputStream::Update() noexcept
}
void
-ProxyInputStream::Seek(offset_type new_offset)
+ProxyInputStream::Seek(std::unique_lock<Mutex> &lock,
+ offset_type new_offset)
{
while (!input)
- set_input_cond.wait(mutex);
+ set_input_cond.wait(lock);
- input->Seek(new_offset);
+ input->Seek(lock, new_offset);
CopyAttributes();
}
@@ -120,12 +121,13 @@ ProxyInputStream::IsAvailable() noexcept
}
size_t
-ProxyInputStream::Read(void *ptr, size_t read_size)
+ProxyInputStream::Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t read_size)
{
while (!input)
- set_input_cond.wait(mutex);
+ set_input_cond.wait(lock);
- size_t nbytes = input->Read(ptr, read_size);
+ size_t nbytes = input->Read(lock, ptr, read_size);
CopyAttributes();
return nbytes;
}
diff --git a/src/input/ProxyInputStream.hxx b/src/input/ProxyInputStream.hxx
index 853deb11c..d030e16ed 100644
--- a/src/input/ProxyInputStream.hxx
+++ b/src/input/ProxyInputStream.hxx
@@ -60,11 +60,13 @@ public:
/* virtual methods from InputStream */
void Check() override;
void Update() noexcept override;
- void Seek(offset_type new_offset) override;
+ void Seek(std::unique_lock<Mutex> &lock,
+ offset_type new_offset) override;
bool IsEOF() noexcept override;
std::unique_ptr<Tag> ReadTag() noexcept override;
bool IsAvailable() noexcept override;
- size_t Read(void *ptr, size_t read_size) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t read_size) override;
protected:
/**
diff --git a/src/input/RewindInputStream.cxx b/src/input/RewindInputStream.cxx
index 98f2d1967..cb760adb1 100644
--- a/src/input/RewindInputStream.cxx
+++ b/src/input/RewindInputStream.cxx
@@ -60,8 +60,9 @@ public:
return !ReadingFromBuffer() && ProxyInputStream::IsEOF();
}
- size_t Read(void *ptr, size_t size) override;
- void Seek(offset_type offset) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override;
+ void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
private:
/**
@@ -74,7 +75,8 @@ private:
};
size_t
-RewindInputStream::Read(void *ptr, size_t read_size)
+RewindInputStream::Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t read_size)
{
if (ReadingFromBuffer()) {
/* buffered read */
@@ -93,7 +95,7 @@ RewindInputStream::Read(void *ptr, size_t read_size)
} else {
/* pass method call to underlying stream */
- size_t nbytes = input->Read(ptr, read_size);
+ size_t nbytes = input->Read(lock, ptr, read_size);
if (input->GetOffset() > (offset_type)sizeof(buffer))
/* disable buffering */
@@ -114,7 +116,7 @@ RewindInputStream::Read(void *ptr, size_t read_size)
}
void
-RewindInputStream::Seek(offset_type new_offset)
+RewindInputStream::Seek(std::unique_lock<Mutex> &lock, offset_type new_offset)
{
assert(IsReady());
@@ -132,7 +134,7 @@ RewindInputStream::Seek(offset_type new_offset)
buffered range now */
tail = 0;
- ProxyInputStream::Seek(new_offset);
+ ProxyInputStream::Seek(lock, new_offset);
}
}
diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx
index 5a21022e1..e1be3ecdc 100644
--- a/src/input/ThreadInputStream.cxx
+++ b/src/input/ThreadInputStream.cxx
@@ -130,7 +130,8 @@ ThreadInputStream::IsAvailable() noexcept
}
inline size_t
-ThreadInputStream::Read(void *ptr, size_t read_size)
+ThreadInputStream::Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t read_size)
{
assert(!thread.IsInside());
@@ -154,7 +155,7 @@ ThreadInputStream::Read(void *ptr, size_t read_size)
return 0;
const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
- cond_handler.cond.wait(mutex);
+ cond_handler.cond.wait(lock);
}
}
diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx
index 14b849238..3c946e12a 100644
--- a/src/input/ThreadInputStream.hxx
+++ b/src/input/ThreadInputStream.hxx
@@ -94,7 +94,8 @@ public:
void Check() override final;
bool IsEOF() noexcept final;
bool IsAvailable() noexcept final;
- size_t Read(void *ptr, size_t size) override final;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override final;
protected:
/**
diff --git a/src/input/plugins/CdioParanoiaInputPlugin.cxx b/src/input/plugins/CdioParanoiaInputPlugin.cxx
index 0079e55a9..9178f8a4a 100644
--- a/src/input/plugins/CdioParanoiaInputPlugin.cxx
+++ b/src/input/plugins/CdioParanoiaInputPlugin.cxx
@@ -91,8 +91,9 @@ class CdioParanoiaInputStream final : public InputStream {
/* virtual methods from InputStream */
bool IsEOF() noexcept override;
- size_t Read(void *ptr, size_t size) override;
- void Seek(offset_type offset) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override;
+ void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
};
static constexpr Domain cdio_domain("cdio");
@@ -255,7 +256,8 @@ input_cdio_open(const char *uri,
}
void
-CdioParanoiaInputStream::Seek(offset_type new_offset)
+CdioParanoiaInputStream::Seek(std::unique_lock<Mutex> &,
+ offset_type new_offset)
{
if (new_offset > size)
throw FormatRuntimeError("Invalid offset to seek %ld (%ld)",
@@ -276,7 +278,8 @@ CdioParanoiaInputStream::Seek(offset_type new_offset)
}
size_t
-CdioParanoiaInputStream::Read(void *ptr, size_t length)
+CdioParanoiaInputStream::Read(std::unique_lock<Mutex> &,
+ void *ptr, size_t length)
{
size_t nbytes = 0;
char *wptr = (char *) ptr;
diff --git a/src/input/plugins/FfmpegInputPlugin.cxx b/src/input/plugins/FfmpegInputPlugin.cxx
index 198f4bd17..febc44872 100644
--- a/src/input/plugins/FfmpegInputPlugin.cxx
+++ b/src/input/plugins/FfmpegInputPlugin.cxx
@@ -49,8 +49,10 @@ public:
/* virtual methods from InputStream */
bool IsEOF() noexcept override;
- size_t Read(void *ptr, size_t size) override;
- void Seek(offset_type offset) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override;
+ void Seek(std::unique_lock<Mutex> &lock,
+ offset_type offset) override;
};
gcc_const
@@ -79,7 +81,8 @@ input_ffmpeg_open(const char *uri,
}
size_t
-FfmpegInputStream::Read(void *ptr, size_t read_size)
+FfmpegInputStream::Read(std::unique_lock<Mutex> &,
+ void *ptr, size_t read_size)
{
size_t result;
@@ -99,7 +102,7 @@ FfmpegInputStream::IsEOF() noexcept
}
void
-FfmpegInputStream::Seek(offset_type new_offset)
+FfmpegInputStream::Seek(std::unique_lock<Mutex> &, offset_type new_offset)
{
uint64_t result;
diff --git a/src/input/plugins/FileInputPlugin.cxx b/src/input/plugins/FileInputPlugin.cxx
index 989d3de74..a61c94c2b 100644
--- a/src/input/plugins/FileInputPlugin.cxx
+++ b/src/input/plugins/FileInputPlugin.cxx
@@ -48,8 +48,10 @@ public:
return GetOffset() >= GetSize();
}
- size_t Read(void *ptr, size_t size) override;
- void Seek(offset_type offset) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override;
+ void Seek(std::unique_lock<Mutex> &lock,
+ offset_type offset) override;
};
InputStreamPtr
@@ -74,7 +76,8 @@ OpenFileInputStream(Path path, Mutex &mutex)
}
void
-FileInputStream::Seek(offset_type new_offset)
+FileInputStream::Seek(std::unique_lock<Mutex> &,
+ offset_type new_offset)
{
{
const ScopeUnlock unlock(mutex);
@@ -85,7 +88,8 @@ FileInputStream::Seek(offset_type new_offset)
}
size_t
-FileInputStream::Read(void *ptr, size_t read_size)
+FileInputStream::Read(std::unique_lock<Mutex> &,
+ void *ptr, size_t read_size)
{
size_t nbytes;
diff --git a/src/input/plugins/SmbclientInputPlugin.cxx b/src/input/plugins/SmbclientInputPlugin.cxx
index dfd95103d..361d0e18e 100644
--- a/src/input/plugins/SmbclientInputPlugin.cxx
+++ b/src/input/plugins/SmbclientInputPlugin.cxx
@@ -56,8 +56,9 @@ public:
return offset >= size;
}
- size_t Read(void *ptr, size_t size) override;
- void Seek(offset_type offset) override;
+ size_t Read(std::unique_lock<Mutex> &lock,
+ void *ptr, size_t size) override;
+ void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override;
};
/*
@@ -118,7 +119,8 @@ input_smbclient_open(const char *uri,
}
size_t
-SmbclientInputStream::Read(void *ptr, size_t read_size)
+SmbclientInputStream::Read(std::unique_lock<Mutex> &,
+ void *ptr, size_t read_size)
{
ssize_t nbytes;
@@ -136,7 +138,8 @@ SmbclientInputStream::Read(void *ptr, size_t read_size)
}
void
-SmbclientInputStream::Seek(offset_type new_offset)
+SmbclientInputStream::Seek(std::unique_lock<Mutex> &,
+ offset_type new_offset)
{
off_t result;