diff options
Diffstat (limited to 'src/input')
-rw-r--r-- | src/input/AsyncInputStream.cxx | 10 | ||||
-rw-r--r-- | src/input/AsyncInputStream.hxx | 6 | ||||
-rw-r--r-- | src/input/BufferedInputStream.cxx | 17 | ||||
-rw-r--r-- | src/input/BufferedInputStream.hxx | 5 | ||||
-rw-r--r-- | src/input/FailingInputStream.hxx | 4 | ||||
-rw-r--r-- | src/input/IcyInputStream.cxx | 7 | ||||
-rw-r--r-- | src/input/IcyInputStream.hxx | 3 | ||||
-rw-r--r-- | src/input/InputStream.cxx | 22 | ||||
-rw-r--r-- | src/input/InputStream.hxx | 20 | ||||
-rw-r--r-- | src/input/ProxyInputStream.cxx | 14 | ||||
-rw-r--r-- | src/input/ProxyInputStream.hxx | 6 | ||||
-rw-r--r-- | src/input/RewindInputStream.cxx | 14 | ||||
-rw-r--r-- | src/input/ThreadInputStream.cxx | 5 | ||||
-rw-r--r-- | src/input/ThreadInputStream.hxx | 3 | ||||
-rw-r--r-- | src/input/plugins/CdioParanoiaInputPlugin.cxx | 11 | ||||
-rw-r--r-- | src/input/plugins/FfmpegInputPlugin.cxx | 11 | ||||
-rw-r--r-- | src/input/plugins/FileInputPlugin.cxx | 12 | ||||
-rw-r--r-- | src/input/plugins/SmbclientInputPlugin.cxx | 11 |
18 files changed, 109 insertions, 72 deletions
diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx index ce78d166d..3ccd41353 100644 --- a/src/input/AsyncInputStream.cxx +++ b/src/input/AsyncInputStream.cxx @@ -95,7 +95,8 @@ AsyncInputStream::IsEOF() noexcept } void -AsyncInputStream::Seek(offset_type new_offset) +AsyncInputStream::Seek(std::unique_lock<Mutex> &lock, + offset_type new_offset) { assert(IsReady()); assert(seek_state == SeekState::NONE); @@ -136,7 +137,7 @@ AsyncInputStream::Seek(offset_type new_offset) CondInputStreamHandler cond_handler; const ScopeExchangeInputStreamHandler h(*this, &cond_handler); while (seek_state != SeekState::NONE) - cond_handler.cond.wait(mutex); + cond_handler.cond.wait(lock); Check(); } @@ -171,7 +172,8 @@ AsyncInputStream::IsAvailable() noexcept } size_t -AsyncInputStream::Read(void *ptr, size_t read_size) +AsyncInputStream::Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t read_size) { assert(!GetEventLoop().IsInside()); @@ -187,7 +189,7 @@ AsyncInputStream::Read(void *ptr, size_t read_size) break; const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(mutex); + cond_handler.cond.wait(lock); } const size_t nbytes = std::min(read_size, r.size); diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx index 958f5f690..2577cd9fa 100644 --- a/src/input/AsyncInputStream.hxx +++ b/src/input/AsyncInputStream.hxx @@ -83,10 +83,12 @@ public: /* virtual methods from InputStream */ void Check() final; bool IsEOF() noexcept final; - void Seek(offset_type new_offset) final; + void Seek(std::unique_lock<Mutex> &lock, + offset_type new_offset) final; std::unique_ptr<Tag> ReadTag() noexcept final; bool IsAvailable() noexcept final; - size_t Read(void *ptr, size_t read_size) final; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t read_size) final; protected: /** diff --git a/src/input/BufferedInputStream.cxx b/src/input/BufferedInputStream.cxx index d28964dbb..69a557a79 100644 --- a/src/input/BufferedInputStream.cxx +++ b/src/input/BufferedInputStream.cxx @@ -64,7 +64,8 @@ BufferedInputStream::Check() } void -BufferedInputStream::Seek(offset_type new_offset) +BufferedInputStream::Seek(std::unique_lock<Mutex> &lock, + offset_type new_offset) { if (new_offset >= size) { offset = size; @@ -84,7 +85,7 @@ BufferedInputStream::Seek(offset_type new_offset) wake_cond.notify_one(); while (seek) - client_cond.wait(mutex); + client_cond.wait(lock); if (seek_error) std::rethrow_exception(std::exchange(seek_error, {})); @@ -105,7 +106,8 @@ BufferedInputStream::IsAvailable() noexcept } size_t -BufferedInputStream::Read(void *ptr, size_t s) +BufferedInputStream::Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t s) { if (offset >= size) return 0; @@ -140,7 +142,7 @@ BufferedInputStream::Read(void *ptr, size_t s) wake_cond.notify_one(); } - client_cond.wait(mutex); + client_cond.wait(lock); } } @@ -156,7 +158,7 @@ BufferedInputStream::RunThread() noexcept if (seek) { try { - input->Seek(seek_offset); + input->Seek(lock, seek_offset); } catch (...) { seek_error = std::current_exception(); } @@ -183,7 +185,7 @@ BufferedInputStream::RunThread() noexcept offset to prepare filling the buffer from there */ try { - input->Seek(offset); + input->Seek(lock, offset); } catch (...) { read_error = std::current_exception(); client_cond.notify_one(); @@ -195,7 +197,8 @@ BufferedInputStream::RunThread() noexcept } try { - size_t nbytes = input->Read(w.data, w.size); + size_t nbytes = input->Read(lock, + w.data, w.size); buffer.Commit(read_offset, read_offset + nbytes); } catch (...) { diff --git a/src/input/BufferedInputStream.hxx b/src/input/BufferedInputStream.hxx index 46a85a470..dcb6282a1 100644 --- a/src/input/BufferedInputStream.hxx +++ b/src/input/BufferedInputStream.hxx @@ -85,12 +85,13 @@ public: /* we don't need to implement Update() because all attributes have been copied already in our constructor */ //void Update() noexcept; - void Seek(offset_type offset) override; + void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override; bool IsEOF() noexcept override; /* we don't support tags */ // std::unique_ptr<Tag> ReadTag() override; bool IsAvailable() noexcept override; - size_t Read(void *ptr, size_t size) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override; /* virtual methods from class InputStreamHandler */ void OnInputStreamReady() noexcept override { diff --git a/src/input/FailingInputStream.hxx b/src/input/FailingInputStream.hxx index 008a1ceae..f150fd972 100644 --- a/src/input/FailingInputStream.hxx +++ b/src/input/FailingInputStream.hxx @@ -45,7 +45,7 @@ public: std::rethrow_exception(error); } - void Seek(offset_type) override { + void Seek(std::unique_lock<Mutex> &, offset_type) override { std::rethrow_exception(error); } @@ -53,7 +53,7 @@ public: return false; } - size_t Read(void *, size_t) override { + size_t Read(std::unique_lock<Mutex> &, void *, size_t) override { std::rethrow_exception(error); } }; diff --git a/src/input/IcyInputStream.cxx b/src/input/IcyInputStream.cxx index 4614d8f81..3d85ea47a 100644 --- a/src/input/IcyInputStream.cxx +++ b/src/input/IcyInputStream.cxx @@ -80,13 +80,14 @@ IcyInputStream::ReadTag() noexcept } size_t -IcyInputStream::Read(void *ptr, size_t read_size) +IcyInputStream::Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t read_size) { if (!IsEnabled()) - return ProxyInputStream::Read(ptr, read_size); + return ProxyInputStream::Read(lock, ptr, read_size); while (true) { - size_t nbytes = ProxyInputStream::Read(ptr, read_size); + size_t nbytes = ProxyInputStream::Read(lock, ptr, read_size); if (nbytes == 0) return 0; diff --git a/src/input/IcyInputStream.hxx b/src/input/IcyInputStream.hxx index 5a6a69074..183cc46d7 100644 --- a/src/input/IcyInputStream.hxx +++ b/src/input/IcyInputStream.hxx @@ -66,7 +66,8 @@ public: /* virtual methods from InputStream */ void Update() noexcept override; std::unique_ptr<Tag> ReadTag() noexcept override; - size_t Read(void *ptr, size_t size) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override; }; #endif diff --git a/src/input/InputStream.cxx b/src/input/InputStream.cxx index e125a0409..f808fb5ee 100644 --- a/src/input/InputStream.cxx +++ b/src/input/InputStream.cxx @@ -72,7 +72,7 @@ InputStream::CheapSeeking() const noexcept } void -InputStream::Seek(gcc_unused offset_type new_offset) +InputStream::Seek(std::unique_lock<Mutex> &, gcc_unused offset_type new_offset) { throw std::runtime_error("Seeking is not implemented"); } @@ -80,15 +80,15 @@ InputStream::Seek(gcc_unused offset_type new_offset) void InputStream::LockSeek(offset_type _offset) { - const std::lock_guard<Mutex> protect(mutex); - Seek(_offset); + std::unique_lock<Mutex> lock(mutex); + Seek(lock, _offset); } void InputStream::LockSkip(offset_type _offset) { - const std::lock_guard<Mutex> protect(mutex); - Skip(_offset); + std::unique_lock<Mutex> lock(mutex); + Skip(lock, _offset); } std::unique_ptr<Tag> @@ -119,18 +119,18 @@ InputStream::LockRead(void *ptr, size_t _size) #endif assert(_size > 0); - const std::lock_guard<Mutex> protect(mutex); - return Read(ptr, _size); + std::unique_lock<Mutex> lock(mutex); + return Read(lock, ptr, _size); } void -InputStream::ReadFull(void *_ptr, size_t _size) +InputStream::ReadFull(std::unique_lock<Mutex> &lock, void *_ptr, size_t _size) { uint8_t *ptr = (uint8_t *)_ptr; size_t nbytes_total = 0; while (_size > 0) { - size_t nbytes = Read(ptr + nbytes_total, _size); + size_t nbytes = Read(lock, ptr + nbytes_total, _size); if (nbytes == 0) throw std::runtime_error("Unexpected end of file"); @@ -148,8 +148,8 @@ InputStream::LockReadFull(void *ptr, size_t _size) #endif assert(_size > 0); - const std::lock_guard<Mutex> protect(mutex); - ReadFull(ptr, _size); + std::unique_lock<Mutex> lock(mutex); + ReadFull(lock, ptr, _size); } bool diff --git a/src/input/InputStream.hxx b/src/input/InputStream.hxx index ba8156d6e..7b19a1e0b 100644 --- a/src/input/InputStream.hxx +++ b/src/input/InputStream.hxx @@ -271,9 +271,11 @@ public: * * Throws std::runtime_error on error. * + * @param lock the locked mutex; may be used to wait on + * condition variables * @param offset the relative offset */ - virtual void Seek(offset_type offset); + virtual void Seek(std::unique_lock<Mutex> &lock, offset_type offset); /** * Wrapper for Seek() which locks and unlocks the mutex; the @@ -285,8 +287,8 @@ public: * Rewind to the beginning of the stream. This is a wrapper * for Seek(0, error). */ - void Rewind() { - Seek(0); + void Rewind(std::unique_lock<Mutex> &lock) { + Seek(lock, 0); } void LockRewind() { @@ -296,8 +298,9 @@ public: /** * Skip input bytes. */ - void Skip(offset_type _offset) { - Seek(GetOffset() + _offset); + void Skip(std::unique_lock<Mutex> &lock, + offset_type _offset) { + Seek(lock, GetOffset() + _offset); } void LockSkip(offset_type _offset); @@ -351,12 +354,15 @@ public: * * Throws std::runtime_error on error. * + * @param lock the locked mutex; may be used to wait on + * condition variables * @param ptr the buffer to read into * @param size the maximum number of bytes to read * @return the number of bytes read */ gcc_nonnull_all - virtual size_t Read(void *ptr, size_t size) = 0; + virtual size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) = 0; /** * Wrapper for Read() which locks and unlocks the mutex; @@ -379,7 +385,7 @@ public: * @return true if the whole data was read, false otherwise. */ gcc_nonnull_all - void ReadFull(void *ptr, size_t size); + void ReadFull(std::unique_lock<Mutex> &lock, void *ptr, size_t size); /** * Wrapper for ReadFull() which locks and unlocks the mutex; diff --git a/src/input/ProxyInputStream.cxx b/src/input/ProxyInputStream.cxx index f05f439f7..345e6f319 100644 --- a/src/input/ProxyInputStream.cxx +++ b/src/input/ProxyInputStream.cxx @@ -89,12 +89,13 @@ ProxyInputStream::Update() noexcept } void -ProxyInputStream::Seek(offset_type new_offset) +ProxyInputStream::Seek(std::unique_lock<Mutex> &lock, + offset_type new_offset) { while (!input) - set_input_cond.wait(mutex); + set_input_cond.wait(lock); - input->Seek(new_offset); + input->Seek(lock, new_offset); CopyAttributes(); } @@ -120,12 +121,13 @@ ProxyInputStream::IsAvailable() noexcept } size_t -ProxyInputStream::Read(void *ptr, size_t read_size) +ProxyInputStream::Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t read_size) { while (!input) - set_input_cond.wait(mutex); + set_input_cond.wait(lock); - size_t nbytes = input->Read(ptr, read_size); + size_t nbytes = input->Read(lock, ptr, read_size); CopyAttributes(); return nbytes; } diff --git a/src/input/ProxyInputStream.hxx b/src/input/ProxyInputStream.hxx index 853deb11c..d030e16ed 100644 --- a/src/input/ProxyInputStream.hxx +++ b/src/input/ProxyInputStream.hxx @@ -60,11 +60,13 @@ public: /* virtual methods from InputStream */ void Check() override; void Update() noexcept override; - void Seek(offset_type new_offset) override; + void Seek(std::unique_lock<Mutex> &lock, + offset_type new_offset) override; bool IsEOF() noexcept override; std::unique_ptr<Tag> ReadTag() noexcept override; bool IsAvailable() noexcept override; - size_t Read(void *ptr, size_t read_size) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t read_size) override; protected: /** diff --git a/src/input/RewindInputStream.cxx b/src/input/RewindInputStream.cxx index 98f2d1967..cb760adb1 100644 --- a/src/input/RewindInputStream.cxx +++ b/src/input/RewindInputStream.cxx @@ -60,8 +60,9 @@ public: return !ReadingFromBuffer() && ProxyInputStream::IsEOF(); } - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override; private: /** @@ -74,7 +75,8 @@ private: }; size_t -RewindInputStream::Read(void *ptr, size_t read_size) +RewindInputStream::Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t read_size) { if (ReadingFromBuffer()) { /* buffered read */ @@ -93,7 +95,7 @@ RewindInputStream::Read(void *ptr, size_t read_size) } else { /* pass method call to underlying stream */ - size_t nbytes = input->Read(ptr, read_size); + size_t nbytes = input->Read(lock, ptr, read_size); if (input->GetOffset() > (offset_type)sizeof(buffer)) /* disable buffering */ @@ -114,7 +116,7 @@ RewindInputStream::Read(void *ptr, size_t read_size) } void -RewindInputStream::Seek(offset_type new_offset) +RewindInputStream::Seek(std::unique_lock<Mutex> &lock, offset_type new_offset) { assert(IsReady()); @@ -132,7 +134,7 @@ RewindInputStream::Seek(offset_type new_offset) buffered range now */ tail = 0; - ProxyInputStream::Seek(new_offset); + ProxyInputStream::Seek(lock, new_offset); } } diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx index 5a21022e1..e1be3ecdc 100644 --- a/src/input/ThreadInputStream.cxx +++ b/src/input/ThreadInputStream.cxx @@ -130,7 +130,8 @@ ThreadInputStream::IsAvailable() noexcept } inline size_t -ThreadInputStream::Read(void *ptr, size_t read_size) +ThreadInputStream::Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t read_size) { assert(!thread.IsInside()); @@ -154,7 +155,7 @@ ThreadInputStream::Read(void *ptr, size_t read_size) return 0; const ScopeExchangeInputStreamHandler h(*this, &cond_handler); - cond_handler.cond.wait(mutex); + cond_handler.cond.wait(lock); } } diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx index 14b849238..3c946e12a 100644 --- a/src/input/ThreadInputStream.hxx +++ b/src/input/ThreadInputStream.hxx @@ -94,7 +94,8 @@ public: void Check() override final; bool IsEOF() noexcept final; bool IsAvailable() noexcept final; - size_t Read(void *ptr, size_t size) override final; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override final; protected: /** diff --git a/src/input/plugins/CdioParanoiaInputPlugin.cxx b/src/input/plugins/CdioParanoiaInputPlugin.cxx index 0079e55a9..9178f8a4a 100644 --- a/src/input/plugins/CdioParanoiaInputPlugin.cxx +++ b/src/input/plugins/CdioParanoiaInputPlugin.cxx @@ -91,8 +91,9 @@ class CdioParanoiaInputStream final : public InputStream { /* virtual methods from InputStream */ bool IsEOF() noexcept override; - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override; }; static constexpr Domain cdio_domain("cdio"); @@ -255,7 +256,8 @@ input_cdio_open(const char *uri, } void -CdioParanoiaInputStream::Seek(offset_type new_offset) +CdioParanoiaInputStream::Seek(std::unique_lock<Mutex> &, + offset_type new_offset) { if (new_offset > size) throw FormatRuntimeError("Invalid offset to seek %ld (%ld)", @@ -276,7 +278,8 @@ CdioParanoiaInputStream::Seek(offset_type new_offset) } size_t -CdioParanoiaInputStream::Read(void *ptr, size_t length) +CdioParanoiaInputStream::Read(std::unique_lock<Mutex> &, + void *ptr, size_t length) { size_t nbytes = 0; char *wptr = (char *) ptr; diff --git a/src/input/plugins/FfmpegInputPlugin.cxx b/src/input/plugins/FfmpegInputPlugin.cxx index 198f4bd17..febc44872 100644 --- a/src/input/plugins/FfmpegInputPlugin.cxx +++ b/src/input/plugins/FfmpegInputPlugin.cxx @@ -49,8 +49,10 @@ public: /* virtual methods from InputStream */ bool IsEOF() noexcept override; - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock<Mutex> &lock, + offset_type offset) override; }; gcc_const @@ -79,7 +81,8 @@ input_ffmpeg_open(const char *uri, } size_t -FfmpegInputStream::Read(void *ptr, size_t read_size) +FfmpegInputStream::Read(std::unique_lock<Mutex> &, + void *ptr, size_t read_size) { size_t result; @@ -99,7 +102,7 @@ FfmpegInputStream::IsEOF() noexcept } void -FfmpegInputStream::Seek(offset_type new_offset) +FfmpegInputStream::Seek(std::unique_lock<Mutex> &, offset_type new_offset) { uint64_t result; diff --git a/src/input/plugins/FileInputPlugin.cxx b/src/input/plugins/FileInputPlugin.cxx index 989d3de74..a61c94c2b 100644 --- a/src/input/plugins/FileInputPlugin.cxx +++ b/src/input/plugins/FileInputPlugin.cxx @@ -48,8 +48,10 @@ public: return GetOffset() >= GetSize(); } - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock<Mutex> &lock, + offset_type offset) override; }; InputStreamPtr @@ -74,7 +76,8 @@ OpenFileInputStream(Path path, Mutex &mutex) } void -FileInputStream::Seek(offset_type new_offset) +FileInputStream::Seek(std::unique_lock<Mutex> &, + offset_type new_offset) { { const ScopeUnlock unlock(mutex); @@ -85,7 +88,8 @@ FileInputStream::Seek(offset_type new_offset) } size_t -FileInputStream::Read(void *ptr, size_t read_size) +FileInputStream::Read(std::unique_lock<Mutex> &, + void *ptr, size_t read_size) { size_t nbytes; diff --git a/src/input/plugins/SmbclientInputPlugin.cxx b/src/input/plugins/SmbclientInputPlugin.cxx index dfd95103d..361d0e18e 100644 --- a/src/input/plugins/SmbclientInputPlugin.cxx +++ b/src/input/plugins/SmbclientInputPlugin.cxx @@ -56,8 +56,9 @@ public: return offset >= size; } - size_t Read(void *ptr, size_t size) override; - void Seek(offset_type offset) override; + size_t Read(std::unique_lock<Mutex> &lock, + void *ptr, size_t size) override; + void Seek(std::unique_lock<Mutex> &lock, offset_type offset) override; }; /* @@ -118,7 +119,8 @@ input_smbclient_open(const char *uri, } size_t -SmbclientInputStream::Read(void *ptr, size_t read_size) +SmbclientInputStream::Read(std::unique_lock<Mutex> &, + void *ptr, size_t read_size) { ssize_t nbytes; @@ -136,7 +138,8 @@ SmbclientInputStream::Read(void *ptr, size_t read_size) } void -SmbclientInputStream::Seek(offset_type new_offset) +SmbclientInputStream::Seek(std::unique_lock<Mutex> &, + offset_type new_offset) { off_t result; |