summaryrefslogtreecommitdiff
path: root/src/input
diff options
context:
space:
mode:
Diffstat (limited to 'src/input')
-rw-r--r--src/input/AsyncInputStream.cxx24
-rw-r--r--src/input/AsyncInputStream.hxx2
-rw-r--r--src/input/CondHandler.hxx43
-rw-r--r--src/input/FailingInputStream.hxx4
-rw-r--r--src/input/Handler.hxx53
-rw-r--r--src/input/InputPlugin.hxx4
-rw-r--r--src/input/InputStream.cxx36
-rw-r--r--src/input/InputStream.hxx68
-rw-r--r--src/input/LocalOpen.cxx6
-rw-r--r--src/input/LocalOpen.hxx3
-rw-r--r--src/input/Open.cxx27
-rw-r--r--src/input/ProxyInputStream.cxx12
-rw-r--r--src/input/ProxyInputStream.hxx19
-rw-r--r--src/input/ThreadInputStream.cxx14
-rw-r--r--src/input/ThreadInputStream.hxx2
-rw-r--r--src/input/plugins/AlsaInputPlugin.cxx16
-rw-r--r--src/input/plugins/ArchiveInputPlugin.cxx4
-rw-r--r--src/input/plugins/ArchiveInputPlugin.hxx3
-rw-r--r--src/input/plugins/CdioParanoiaInputPlugin.cxx8
-rw-r--r--src/input/plugins/CurlInputPlugin.cxx25
-rw-r--r--src/input/plugins/CurlInputPlugin.hxx3
-rw-r--r--src/input/plugins/FfmpegInputPlugin.cxx8
-rw-r--r--src/input/plugins/FileInputPlugin.cxx9
-rw-r--r--src/input/plugins/FileInputPlugin.hxx4
-rw-r--r--src/input/plugins/MmsInputPlugin.cxx8
-rw-r--r--src/input/plugins/NfsInputPlugin.cxx13
-rw-r--r--src/input/plugins/QobuzInputPlugin.cxx16
-rw-r--r--src/input/plugins/SmbclientInputPlugin.cxx8
-rw-r--r--src/input/plugins/TidalInputPlugin.cxx16
29 files changed, 306 insertions, 152 deletions
diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx
index 772a87df0..72decc465 100644
--- a/src/input/AsyncInputStream.cxx
+++ b/src/input/AsyncInputStream.cxx
@@ -19,6 +19,7 @@
#include "config.h"
#include "AsyncInputStream.hxx"
+#include "CondHandler.hxx"
#include "tag/Tag.hxx"
#include "thread/Cond.hxx"
#include "event/Loop.hxx"
@@ -29,10 +30,10 @@
#include <string.h>
AsyncInputStream::AsyncInputStream(EventLoop &event_loop, const char *_url,
- Mutex &_mutex, Cond &_cond,
+ Mutex &_mutex,
size_t _buffer_size,
size_t _resume_at)
- :InputStream(_url, _mutex, _cond),
+ :InputStream(_url, _mutex),
deferred_resume(event_loop, BIND_THIS_METHOD(DeferredResume)),
deferred_seek(event_loop, BIND_THIS_METHOD(DeferredSeek)),
allocation(_buffer_size),
@@ -133,8 +134,10 @@ AsyncInputStream::Seek(offset_type new_offset)
deferred_seek.Schedule();
+ CondInputStreamHandler cond_handler;
+ const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
while (seek_state != SeekState::NONE)
- cond.wait(mutex);
+ cond_handler.cond.wait(mutex);
Check();
}
@@ -151,7 +154,7 @@ AsyncInputStream::SeekDone() noexcept
open = true;
seek_state = SeekState::NONE;
- cond.broadcast();
+ InvokeOnAvailable();
}
std::unique_ptr<Tag>
@@ -173,6 +176,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
{
assert(!GetEventLoop().IsInside());
+ CondInputStreamHandler cond_handler;
+
/* wait for data */
CircularBuffer<uint8_t>::Range r;
while (true) {
@@ -182,7 +187,8 @@ AsyncInputStream::Read(void *ptr, size_t read_size)
if (!r.empty() || IsEOF())
break;
- cond.wait(mutex);
+ const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
+ cond_handler.cond.wait(mutex);
}
const size_t nbytes = std::min(read_size, r.size);
@@ -205,7 +211,7 @@ AsyncInputStream::CommitWriteBuffer(size_t nbytes) noexcept
if (!IsReady())
SetReady();
else
- cond.broadcast();
+ InvokeOnAvailable();
}
void
@@ -231,7 +237,7 @@ AsyncInputStream::AppendToBuffer(const void *data, size_t append_size) noexcept
if (!IsReady())
SetReady();
else
- cond.broadcast();
+ InvokeOnAvailable();
}
void
@@ -243,7 +249,7 @@ AsyncInputStream::DeferredResume() noexcept
Resume();
} catch (...) {
postponed_exception = std::current_exception();
- cond.broadcast();
+ InvokeOnAvailable();
}
}
@@ -265,6 +271,6 @@ AsyncInputStream::DeferredSeek() noexcept
} catch (...) {
seek_state = SeekState::NONE;
postponed_exception = std::current_exception();
- cond.broadcast();
+ InvokeOnAvailable();
}
}
diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx
index 117ebfc3a..c4bc1aca4 100644
--- a/src/input/AsyncInputStream.hxx
+++ b/src/input/AsyncInputStream.hxx
@@ -70,7 +70,7 @@ protected:
public:
AsyncInputStream(EventLoop &event_loop, const char *_url,
- Mutex &_mutex, Cond &_cond,
+ Mutex &_mutex,
size_t _buffer_size,
size_t _resume_at);
diff --git a/src/input/CondHandler.hxx b/src/input/CondHandler.hxx
new file mode 100644
index 000000000..70538d25d
--- /dev/null
+++ b/src/input/CondHandler.hxx
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2003-2018 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef MPD_COND_INPUT_STREAM_HANDLER_HXX
+#define MPD_COND_INPUT_STREAM_HANDLER_HXX
+
+#include "check.h"
+#include "Handler.hxx"
+#include "thread/Cond.hxx"
+
+/**
+ * An #InputStreamHandler implementation which signals a #Cond.
+ */
+struct CondInputStreamHandler final : InputStreamHandler {
+ Cond cond;
+
+ /* virtual methods from class InputStreamHandler */
+ void OnInputStreamReady() noexcept override {
+ cond.signal();
+ }
+
+ void OnInputStreamAvailable() noexcept override {
+ cond.signal();
+ }
+};
+
+#endif
diff --git a/src/input/FailingInputStream.hxx b/src/input/FailingInputStream.hxx
index a5b37d00e..d8436f195 100644
--- a/src/input/FailingInputStream.hxx
+++ b/src/input/FailingInputStream.hxx
@@ -35,8 +35,8 @@ class FailingInputStream final : public InputStream {
public:
explicit FailingInputStream(const char *_uri,
const std::exception_ptr _error,
- Mutex &_mutex, Cond &_cond) noexcept
- :InputStream(_uri, _mutex, _cond), error(_error) {
+ Mutex &_mutex) noexcept
+ :InputStream(_uri, _mutex), error(_error) {
SetReady();
}
diff --git a/src/input/Handler.hxx b/src/input/Handler.hxx
new file mode 100644
index 000000000..533e51518
--- /dev/null
+++ b/src/input/Handler.hxx
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2003-2018 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef MPD_INPUT_STREAM_HANDLER_HXX
+#define MPD_INPUT_STREAM_HANDLER_HXX
+
+#include "check.h"
+
+/**
+ * An interface which gets receives events from an #InputStream. Its
+ * methods will be called from within an arbitrary thread and must not
+ * block.
+ *
+ * A reference to an instance is passed to the #InputStream, but it
+ * remains owned by the caller.
+ */
+class InputStreamHandler {
+public:
+ /**
+ * Called when InputStream::IsReady() becomes true.
+ *
+ * Before querying metadata from the #InputStream,
+ * InputStream::Update() must be called.
+ *
+ * Caller locks InputStream::mutex.
+ */
+ virtual void OnInputStreamReady() noexcept = 0;
+
+ /**
+ * Called when InputStream::IsAvailable() becomes true.
+ *
+ * Caller locks InputStream::mutex.
+ */
+ virtual void OnInputStreamAvailable() noexcept = 0;
+};
+
+#endif
diff --git a/src/input/InputPlugin.hxx b/src/input/InputPlugin.hxx
index 7e278f4ae..0a235f51a 100644
--- a/src/input/InputPlugin.hxx
+++ b/src/input/InputPlugin.hxx
@@ -24,7 +24,6 @@
struct ConfigBlock;
class Mutex;
-class Cond;
class EventLoop;
class RemoteTagScanner;
class RemoteTagHandler;
@@ -54,8 +53,7 @@ struct InputPlugin {
*
* Throws std::runtime_error on error.
*/
- InputStreamPtr (*open)(const char *uri,
- Mutex &mutex, Cond &cond);
+ InputStreamPtr (*open)(const char *uri, Mutex &mutex);
/**
* Prepare a #RemoteTagScanner. The operation must be started
diff --git a/src/input/InputStream.cxx b/src/input/InputStream.cxx
index c1d6b8c08..6dd53456d 100644
--- a/src/input/InputStream.cxx
+++ b/src/input/InputStream.cxx
@@ -19,8 +19,8 @@
#include "config.h"
#include "InputStream.hxx"
+#include "Handler.hxx"
#include "tag/Tag.hxx"
-#include "thread/Cond.hxx"
#include "util/StringCompare.hxx"
#include <stdexcept>
@@ -47,26 +47,8 @@ InputStream::SetReady() noexcept
assert(!ready);
ready = true;
- cond.broadcast();
-}
-void
-InputStream::WaitReady() noexcept
-{
- while (true) {
- Update();
- if (ready)
- break;
-
- cond.wait(mutex);
- }
-}
-
-void
-InputStream::LockWaitReady() noexcept
-{
- const std::lock_guard<Mutex> protect(mutex);
- WaitReady();
+ InvokeOnReady();
}
/**
@@ -177,3 +159,17 @@ InputStream::LockIsEOF() noexcept
const std::lock_guard<Mutex> protect(mutex);
return IsEOF();
}
+
+void
+InputStream::InvokeOnReady() noexcept
+{
+ if (handler != nullptr)
+ handler->OnInputStreamReady();
+}
+
+void
+InputStream::InvokeOnAvailable() noexcept
+{
+ if (handler != nullptr)
+ handler->OnInputStreamAvailable();
+}
diff --git a/src/input/InputStream.hxx b/src/input/InputStream.hxx
index f31c63ae4..331ec96bb 100644
--- a/src/input/InputStream.hxx
+++ b/src/input/InputStream.hxx
@@ -31,8 +31,8 @@
#include <assert.h>
-class Cond;
struct Tag;
+class InputStreamHandler;
class InputStream {
public:
@@ -55,6 +55,7 @@ public:
*/
Mutex &mutex;
+private:
/**
* A cond that gets signalled when the state of this object
* changes from the I/O thread. The client of this object may
@@ -63,7 +64,7 @@ public:
* This object is allocated by the client, and the client is
* responsible for freeing it.
*/
- Cond &cond;
+ InputStreamHandler *handler = nullptr;
protected:
/**
@@ -96,9 +97,9 @@ private:
std::string mime;
public:
- InputStream(const char *_uri, Mutex &_mutex, Cond &_cond) noexcept
+ InputStream(const char *_uri, Mutex &_mutex) noexcept
:uri(_uri),
- mutex(_mutex), cond(_cond) {
+ mutex(_mutex) {
assert(_uri != nullptr);
}
@@ -122,16 +123,33 @@ public:
* notifications
* @return an #InputStream object on success
*/
- gcc_nonnull_all
- static InputStreamPtr Open(const char *uri, Mutex &mutex, Cond &cond);
+ gcc_nonnull(1)
+ static InputStreamPtr Open(const char *uri, Mutex &mutex);
/**
* Just like Open(), but waits for the stream to become ready.
* It is a wrapper for Open(), WaitReady() and Check().
*/
- gcc_nonnull_all
- static InputStreamPtr OpenReady(const char *uri,
- Mutex &mutex, Cond &cond);
+ gcc_nonnull(1)
+ static InputStreamPtr OpenReady(const char *uri, Mutex &mutex);
+
+ /**
+ * Install a new handler.
+ *
+ * The caller must lock the mutex.
+ */
+ void SetHandler(InputStreamHandler *new_handler) noexcept {
+ handler = new_handler;
+ }
+
+ /**
+ * Install a new handler and return the old one.
+ *
+ * The caller must lock the mutex.
+ */
+ InputStreamHandler *ExchangeHandler(InputStreamHandler *new_handler) noexcept {
+ return std::exchange(handler, new_handler);
+ }
/**
* The absolute URI which was used to open this stream.
@@ -166,14 +184,6 @@ public:
return ready;
}
- void WaitReady() noexcept;
-
- /**
- * Wrapper for WaitReady() which locks and unlocks the mutex;
- * the caller must not be holding it already.
- */
- void LockWaitReady() noexcept;
-
gcc_pure
bool HasMimeType() const noexcept {
assert(ready);
@@ -380,6 +390,30 @@ public:
*/
gcc_nonnull_all
void LockReadFull(void *ptr, size_t size);
+
+protected:
+ void InvokeOnReady() noexcept;
+ void InvokeOnAvailable() noexcept;
+};
+
+/**
+ * Install an #InputStreamHandler during the scope in which this
+ * variable lives, and restore the old handler afterwards.
+ */
+class ScopeExchangeInputStreamHandler {
+ InputStream &is;
+ InputStreamHandler *const old_handler;
+
+public:
+ ScopeExchangeInputStreamHandler(InputStream &_is,
+ InputStreamHandler *new_handler) noexcept
+ :is(_is), old_handler(is.ExchangeHandler(new_handler)) {}
+
+ ScopeExchangeInputStreamHandler(const ScopeExchangeInputStreamHandler &) = delete;
+
+ ~ScopeExchangeInputStreamHandler() noexcept {
+ is.SetHandler(old_handler);
+ }
};
#endif
diff --git a/src/input/LocalOpen.cxx b/src/input/LocalOpen.cxx
index 06129f2a5..a5043fe22 100644
--- a/src/input/LocalOpen.cxx
+++ b/src/input/LocalOpen.cxx
@@ -32,20 +32,20 @@
#include <assert.h>
InputStreamPtr
-OpenLocalInputStream(Path path, Mutex &mutex, Cond &cond)
+OpenLocalInputStream(Path path, Mutex &mutex)
{
InputStreamPtr is;
#ifdef ENABLE_ARCHIVE
try {
#endif
- is = OpenFileInputStream(path, mutex, cond);
+ is = OpenFileInputStream(path, mutex);
#ifdef ENABLE_ARCHIVE
} catch (const std::system_error &e) {
if (IsPathNotFound(e)) {
/* ENOTDIR means this may be a path inside an archive
file */
- is = OpenArchiveInputStream(path, mutex, cond);
+ is = OpenArchiveInputStream(path, mutex);
if (!is)
throw;
} else
diff --git a/src/input/LocalOpen.hxx b/src/input/LocalOpen.hxx
index e73b3ee00..902b78c30 100644
--- a/src/input/LocalOpen.hxx
+++ b/src/input/LocalOpen.hxx
@@ -25,7 +25,6 @@
class Path;
class Mutex;
-class Cond;
/**
* Open a "local" file. This is a wrapper for the input plugins
@@ -34,6 +33,6 @@ class Cond;
* Throws std::runtime_error on error.
*/
InputStreamPtr
-OpenLocalInputStream(Path path, Mutex &mutex, Cond &cond);
+OpenLocalInputStream(Path path, Mutex &mutex);
#endif
diff --git a/src/input/Open.cxx b/src/input/Open.cxx
index 9c136b064..22400c142 100644
--- a/src/input/Open.cxx
+++ b/src/input/Open.cxx
@@ -22,6 +22,7 @@
#include "Registry.hxx"
#include "InputPlugin.hxx"
#include "LocalOpen.hxx"
+#include "CondHandler.hxx"
#include "RewindInputStream.hxx"
#include "fs/Traits.hxx"
#include "fs/AllocatedPath.hxx"
@@ -29,16 +30,15 @@
#include <stdexcept>
InputStreamPtr
-InputStream::Open(const char *url,
- Mutex &mutex, Cond &cond)
+InputStream::Open(const char *url, Mutex &mutex)
{
if (PathTraitsUTF8::IsAbsolute(url)) {
const auto path = AllocatedPath::FromUTF8Throw(url);
- return OpenLocalInputStream(path, mutex, cond);
+ return OpenLocalInputStream(path, mutex);
}
input_plugins_for_each_enabled(plugin) {
- auto is = plugin->open(url, mutex, cond);
+ auto is = plugin->open(url, mutex);
if (is != nullptr)
return input_rewind_open(std::move(is));
}
@@ -47,16 +47,27 @@ InputStream::Open(const char *url,
}
InputStreamPtr
-InputStream::OpenReady(const char *uri,
- Mutex &mutex, Cond &cond)
+InputStream::OpenReady(const char *uri, Mutex &mutex)
{
- auto is = Open(uri, mutex, cond);
+ CondInputStreamHandler handler;
+
+ auto is = Open(uri, mutex);
+ is->SetHandler(&handler);
{
const std::lock_guard<Mutex> protect(mutex);
- is->WaitReady();
+
+ while (true) {
+ is->Update();
+ if (is->IsReady())
+ break;
+
+ handler.cond.wait(mutex);
+ }
+
is->Check();
}
+ is->SetHandler(nullptr);
return is;
}
diff --git a/src/input/ProxyInputStream.cxx b/src/input/ProxyInputStream.cxx
index 86d2e82aa..d78664173 100644
--- a/src/input/ProxyInputStream.cxx
+++ b/src/input/ProxyInputStream.cxx
@@ -20,15 +20,16 @@
#include "config.h"
#include "ProxyInputStream.hxx"
#include "tag/Tag.hxx"
-#include "thread/Cond.hxx"
#include <stdexcept>
ProxyInputStream::ProxyInputStream(InputStreamPtr _input) noexcept
- :InputStream(_input->GetURI(), _input->mutex, _input->cond),
+ :InputStream(_input->GetURI(), _input->mutex),
input(std::move(_input))
{
assert(input);
+
+ input->SetHandler(this);
}
ProxyInputStream::~ProxyInputStream() noexcept = default;
@@ -40,10 +41,13 @@ ProxyInputStream::SetInput(InputStreamPtr _input) noexcept
assert(_input);
input = std::move(_input);
+ input->SetHandler(this);
/* this call wakes up client threads if the new input is
ready */
CopyAttributes();
+
+ set_input_cond.signal();
}
void
@@ -89,7 +93,7 @@ void
ProxyInputStream::Seek(offset_type new_offset)
{
while (!input)
- cond.wait(mutex);
+ set_input_cond.wait(mutex);
input->Seek(new_offset);
CopyAttributes();
@@ -120,7 +124,7 @@ size_t
ProxyInputStream::Read(void *ptr, size_t read_size)
{
while (!input)
- cond.wait(mutex);
+ set_input_cond.wait(mutex);
size_t nbytes = input->Read(ptr, read_size);
CopyAttributes();
diff --git a/src/input/ProxyInputStream.hxx b/src/input/ProxyInputStream.hxx
index e55330a64..8dbc7f701 100644
--- a/src/input/ProxyInputStream.hxx
+++ b/src/input/ProxyInputStream.hxx
@@ -22,6 +22,8 @@
#include "InputStream.hxx"
#include "Ptr.hxx"
+#include "Handler.hxx"
+#include "thread/Cond.hxx"
struct Tag;
@@ -33,7 +35,9 @@ struct Tag;
* The inner #InputStream instance may be nullptr initially, to be set
* later.
*/
-class ProxyInputStream : public InputStream {
+class ProxyInputStream : public InputStream, protected InputStreamHandler {
+ Cond set_input_cond;
+
protected:
InputStreamPtr input;
@@ -45,8 +49,8 @@ public:
* Once that instance becomes available, call SetInput().
*/
ProxyInputStream(const char *_uri,
- Mutex &_mutex, Cond &_cond) noexcept
- :InputStream(_uri, _mutex, _cond) {}
+ Mutex &_mutex) noexcept
+ :InputStream(_uri, _mutex) {}
virtual ~ProxyInputStream() noexcept;
@@ -78,6 +82,15 @@ protected:
* attributes.
*/
void CopyAttributes();
+
+ /* virtual methods from class InputStreamHandler */
+ void OnInputStreamReady() noexcept override {
+ InvokeOnReady();
+ }
+
+ void OnInputStreamAvailable() noexcept override {
+ InvokeOnAvailable();
+ }
};
#endif
diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx
index f33f8dbd7..1c7357262 100644
--- a/src/input/ThreadInputStream.cxx
+++ b/src/input/ThreadInputStream.cxx
@@ -19,6 +19,7 @@
#include "config.h"
#include "ThreadInputStream.hxx"
+#include "CondHandler.hxx"
#include "thread/Name.hxx"
#include <assert.h>
@@ -26,9 +27,9 @@
ThreadInputStream::ThreadInputStream(const char *_plugin,
const char *_uri,
- Mutex &_mutex, Cond &_cond,
+ Mutex &_mutex,
size_t _buffer_size) noexcept
- :InputStream(_uri, _mutex, _cond),
+ :InputStream(_uri, _mutex),
plugin(_plugin),
thread(BIND_THIS_METHOD(ThreadFunc)),
allocation(_buffer_size),
@@ -94,11 +95,11 @@ ThreadInputStream::ThreadFunc() noexcept
nbytes = ThreadRead(w.data, w.size);
} catch (...) {
postponed_exception = std::current_exception();
- cond.broadcast();
+ InvokeOnAvailable();
break;
}
- cond.broadcast();
+ InvokeOnAvailable();
if (nbytes == 0) {
eof = true;
@@ -134,6 +135,8 @@ ThreadInputStream::Read(void *ptr, size_t read_size)
{
assert(!thread.IsInside());
+ CondInputStreamHandler cond_handler;
+
while (true) {
if (postponed_exception)
std::rethrow_exception(postponed_exception);
@@ -151,7 +154,8 @@ ThreadInputStream::Read(void *ptr, size_t read_size)
if (eof)
return 0;
- cond.wait(mutex);
+ const ScopeExchangeInputStreamHandler h(*this, &cond_handler);
+ cond_handler.cond.wait(mutex);
}
}
diff --git a/src/input/ThreadInputStream.hxx b/src/input/ThreadInputStream.hxx
index 4883dae24..cb97ac771 100644
--- a/src/input/ThreadInputStream.hxx
+++ b/src/input/ThreadInputStream.hxx
@@ -76,7 +76,7 @@ class ThreadInputStream : public InputStream {
public:
ThreadInputStream(const char *_plugin,
- const char *_uri, Mutex &_mutex, Cond &_cond,
+ const char *_uri, Mutex &_mutex,
size_t _buffer_size) noexcept;
#ifndef NDEBUG
diff --git a/src/input/plugins/AlsaInputPlugin.cxx b/src/input/plugins/AlsaInputPlugin.cxx
index ee8fdf55a..0cfdb59f5 100644
--- a/src/input/plugins/AlsaInputPlugin.cxx
+++ b/src/input/plugins/AlsaInputPlugin.cxx
@@ -75,10 +75,10 @@ class AlsaInputStream final
public:
AlsaInputStream(EventLoop &_loop,
- const char *_uri, Mutex &_mutex, Cond &_cond,
+ const char *_uri, Mutex &_mutex,
const char *_device,
snd_pcm_t *_handle, int _frame_size)
- :AsyncInputStream(_loop, _uri, _mutex, _cond,
+ :AsyncInputStream(_loop, _uri, _mutex,
ALSA_MAX_BUFFERED, ALSA_RESUME_AT),
MultiSocketMonitor(_loop),
device(_device),
@@ -111,7 +111,7 @@ public:
}
static InputStreamPtr Create(EventLoop &event_loop, const char *uri,
- Mutex &mutex, Cond &cond);
+ Mutex &mutex);
protected:
/* virtual methods from AsyncInputStream */
@@ -148,7 +148,7 @@ private:
inline InputStreamPtr
AlsaInputStream::Create(EventLoop &event_loop, const char *uri,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
const char *device = StringAfterPrefix(uri, "alsa://");
if (device == nullptr)
@@ -168,7 +168,7 @@ AlsaInputStream::Create(EventLoop &event_loop, const char *uri,
int frame_size = snd_pcm_format_width(format) / 8 * channels;
return std::make_unique<AlsaInputStream>(event_loop,
- uri, mutex, cond,
+ uri, mutex,
device, handle, frame_size);
}
@@ -204,7 +204,7 @@ AlsaInputStream::DispatchSockets() noexcept
if (Recover(n_frames) < 0) {
postponed_exception = std::make_exception_ptr(std::runtime_error("PCM error - stream aborted"));
- cond.broadcast();
+ InvokeOnAvailable();
return;
}
}
@@ -403,10 +403,10 @@ alsa_input_init(EventLoop &event_loop, const ConfigBlock &)
}
static InputStreamPtr
-alsa_input_open(const char *uri, Mutex &mutex, Cond &cond)
+alsa_input_open(const char *uri, Mutex &mutex)
{
return AlsaInputStream::Create(*alsa_input_event_loop, uri,
- mutex, cond);
+ mutex);
}
const struct InputPlugin input_plugin_alsa = {
diff --git a/src/input/plugins/ArchiveInputPlugin.cxx b/src/input/plugins/ArchiveInputPlugin.cxx
index 9c26c7939..cfede2253 100644
--- a/src/input/plugins/ArchiveInputPlugin.cxx
+++ b/src/input/plugins/ArchiveInputPlugin.cxx
@@ -35,7 +35,7 @@
#include <stdlib.h>
InputStreamPtr
-OpenArchiveInputStream(Path path, Mutex &mutex, Cond &cond)
+OpenArchiveInputStream(Path path, Mutex &mutex)
{
const ArchivePlugin *arplug;
@@ -61,5 +61,5 @@ OpenArchiveInputStream(Path path, Mutex &mutex, Cond &cond)
}
return archive_file_open(arplug, Path::FromFS(archive))
- ->OpenStream(filename, mutex, cond);
+ ->OpenStream(filename, mutex);
}
diff --git a/src/input/plugins/ArchiveInputPlugin.hxx b/src/input/plugins/ArchiveInputPlugin.hxx
index 5e8934052..9aa2b0bfe 100644
--- a/src/input/plugins/ArchiveInputPlugin.hxx
+++ b/src/input/plugins/ArchiveInputPlugin.hxx
@@ -24,9 +24,8 @@
class Path;
class Mutex;
-class Cond;
InputStreamPtr
-OpenArchiveInputStream(Path path, Mutex &mutex, Cond &cond);
+OpenArchiveInputStream(Path path, Mutex &mutex);
#endif
diff --git a/src/input/plugins/CdioParanoiaInputPlugin.cxx b/src/input/plugins/CdioParanoiaInputPlugin.cxx
index d81577bb7..22121efc9 100644
--- a/src/input/plugins/CdioParanoiaInputPlugin.cxx
+++ b/src/input/plugins/CdioParanoiaInputPlugin.cxx
@@ -61,11 +61,11 @@ class CdioParanoiaInputStream final : public InputStream {
int buffer_lsn;
public:
- CdioParanoiaInputStream(const char *_uri, Mutex &_mutex, Cond &_cond,
+ CdioParanoiaInputStream(const char *_uri, Mutex &_mutex,
cdrom_drive_t *_drv, CdIo_t *_cdio,
bool reverse_endian,
lsn_t _lsn_from, lsn_t _lsn_to)
- :InputStream(_uri, _mutex, _cond),
+ :InputStream(_uri, _mutex),
drv(_drv), cdio(_cdio), para(cdio_paranoia_init(drv)),
lsn_from(_lsn_from), lsn_to(_lsn_to),
lsn_relofs(0),
@@ -184,7 +184,7 @@ cdio_detect_device(void)
static InputStreamPtr
input_cdio_open(const char *uri,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
struct cdio_uri parsed_uri;
if (!parse_cdio_uri(&parsed_uri, uri))
@@ -250,7 +250,7 @@ input_cdio_open(const char *uri,
lsn_to = cdio_get_disc_last_lsn(cdio);
}
- return std::make_unique<CdioParanoiaInputStream>(uri, mutex, cond,
+ return std::make_unique<CdioParanoiaInputStream>(uri, mutex,
drv, cdio,
reverse_endian,
lsn_from, lsn_to);
diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx
index 69876a463..1cca33aa6 100644
--- a/src/input/plugins/CurlInputPlugin.cxx
+++ b/src/input/plugins/CurlInputPlugin.cxx
@@ -35,7 +35,6 @@
#include "tag/Tag.hxx"
#include "event/Call.hxx"
#include "event/Loop.hxx"
-#include "thread/Cond.hxx"
#include "util/ASCII.hxx"
#include "util/StringUtil.hxx"
#include "util/StringFormat.hxx"
@@ -83,7 +82,7 @@ public:
CurlInputStream(EventLoop &event_loop, const char *_url,
const std::multimap<std::string, std::string> &headers,
I &&_icy,
- Mutex &_mutex, Cond &_cond);
+ Mutex &_mutex);
~CurlInputStream() noexcept;
@@ -92,7 +91,7 @@ public:
static InputStreamPtr Open(const char *url,
const std::multimap<std::string, std::string> &headers,
- Mutex &mutex, Cond &cond);
+ Mutex &mutex);
private:
/**
@@ -274,7 +273,7 @@ void
CurlInputStream::OnEnd()
{
const std::lock_guard<Mutex> protect(mutex);
- cond.broadcast();
+ InvokeOnAvailable();
AsyncInputStream::SetClosed();
}
@@ -290,7 +289,7 @@ CurlInputStream::OnError(std::exception_ptr e) noexcept
else if (!IsReady())
SetReady();
else
- cond.broadcast();
+ InvokeOnAvailable();
AsyncInputStream::SetClosed();
}
@@ -352,8 +351,8 @@ inline
CurlInputStream::CurlInputStream(EventLoop &event_loop, const char *_url,
const std::multimap<std::string, std::string> &headers,
I &&_icy,
- Mutex &_mutex, Cond &_cond)
- :AsyncInputStream(event_loop, _url, _mutex, _cond,
+ Mutex &_mutex)
+ :AsyncInputStream(event_loop, _url, _mutex,
CURL_MAX_BUFFERED,
CURL_RESUME_AT),
icy(std::forward<I>(_icy))
@@ -445,14 +444,14 @@ CurlInputStream::DoSeek(offset_type new_offset)
inline InputStreamPtr
CurlInputStream::Open(const char *url,
const std::multimap<std::string, std::string> &headers,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
auto icy = std::make_shared<IcyMetaDataParser>();
auto c = std::make_unique<CurlInputStream>((*curl_init)->GetEventLoop(),
url, headers,
icy,
- mutex, cond);
+ mutex);
BlockingCall(c->GetEventLoop(), [&c](){
c->InitEasy();
@@ -465,19 +464,19 @@ CurlInputStream::Open(const char *url,
InputStreamPtr
OpenCurlInputStream(const char *uri,
const std::multimap<std::string, std::string> &headers,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
- return CurlInputStream::Open(uri, headers, mutex, cond);
+ return CurlInputStream::Open(uri, headers, mutex);
}
static InputStreamPtr
-input_curl_open(const char *url, Mutex &mutex, Cond &cond)
+input_curl_open(const char *url, Mutex &mutex)
{
if (strncmp(url, "http://", 7) != 0 &&
strncmp(url, "https://", 8) != 0)
return nullptr;
- return CurlInputStream::Open(url, {}, mutex, cond);
+ return CurlInputStream::Open(url, {}, mutex);
}
const struct InputPlugin input_plugin_curl = {
diff --git a/src/input/plugins/CurlInputPlugin.hxx b/src/input/plugins/CurlInputPlugin.hxx
index 478acd43f..bd64f09d3 100644
--- a/src/input/plugins/CurlInputPlugin.hxx
+++ b/src/input/plugins/CurlInputPlugin.hxx
@@ -26,7 +26,6 @@
#include <map>
class Mutex;
-class Cond;
extern const struct InputPlugin input_plugin_curl;
@@ -40,6 +39,6 @@ extern const struct InputPlugin input_plugin_curl;
InputStreamPtr
OpenCurlInputStream(const char *uri,
const std::multimap<std::string, std::string> &headers,
- Mutex &mutex, Cond &cond);
+ Mutex &mutex);
#endif
diff --git a/src/input/plugins/FfmpegInputPlugin.cxx b/src/input/plugins/FfmpegInputPlugin.cxx
index b78d1dd13..9a8dc9833 100644
--- a/src/input/plugins/FfmpegInputPlugin.cxx
+++ b/src/input/plugins/FfmpegInputPlugin.cxx
@@ -39,9 +39,9 @@ struct FfmpegInputStream final : public InputStream {
bool eof;
- FfmpegInputStream(const char *_uri, Mutex &_mutex, Cond &_cond,
+ FfmpegInputStream(const char *_uri, Mutex &_mutex,
AVIOContext *_h)
- :InputStream(_uri, _mutex, _cond),
+ :InputStream(_uri, _mutex),
h(_h), eof(false) {
seekable = (h->seekable & AVIO_SEEKABLE_NORMAL) != 0;
size = avio_size(h);
@@ -83,7 +83,7 @@ input_ffmpeg_init(EventLoop &, const ConfigBlock &)
static InputStreamPtr
input_ffmpeg_open(const char *uri,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
if (!StringStartsWith(uri, "gopher://") &&
!StringStartsWith(uri, "rtp://") &&
@@ -98,7 +98,7 @@ input_ffmpeg_open(const char *uri,
if (result != 0)
throw MakeFfmpegError(result);
- return std::make_unique<FfmpegInputStream>(uri, mutex, cond, h);
+ return std::make_unique<FfmpegInputStream>(uri, mutex, h);
}
size_t
diff --git a/src/input/plugins/FileInputPlugin.cxx b/src/input/plugins/FileInputPlugin.cxx
index cc8e62413..f757c9a93 100644
--- a/src/input/plugins/FileInputPlugin.cxx
+++ b/src/input/plugins/FileInputPlugin.cxx
@@ -35,8 +35,8 @@ class FileInputStream final : public InputStream {
public:
FileInputStream(const char *path, FileReader &&_reader, off_t _size,
- Mutex &_mutex, Cond &_cond)
- :InputStream(path, _mutex, _cond),
+ Mutex &_mutex)
+ :InputStream(path, _mutex),
reader(std::move(_reader)) {
size = _size;
seekable = true;
@@ -54,8 +54,7 @@ public:
};
InputStreamPtr
-OpenFileInputStream(Path path,
- Mutex &mutex, Cond &cond)
+OpenFileInputStream(Path path, Mutex &mutex)
{
FileReader reader(path);
@@ -75,7 +74,7 @@ OpenFileInputStream(Path path,
return std::make_unique<FileInputStream>(path.ToUTF8().c_str(),
std::move(reader), info.GetSize(),
- mutex, cond);
+ mutex);
}
void
diff --git a/src/input/plugins/FileInputPlugin.hxx b/src/input/plugins/FileInputPlugin.hxx
index 4a5528185..867564d3c 100644
--- a/src/input/plugins/FileInputPlugin.hxx
+++ b/src/input/plugins/FileInputPlugin.hxx
@@ -24,10 +24,8 @@
class Path;
class Mutex;
-class Cond;
InputStreamPtr
-OpenFileInputStream(Path path,
- Mutex &mutex, Cond &cond);
+OpenFileInputStream(Path path, Mutex &mutex);
#endif
diff --git a/src/input/plugins/MmsInputPlugin.cxx b/src/input/plugins/MmsInputPlugin.cxx
index de35e2f15..87cbcc520 100644
--- a/src/input/plugins/MmsInputPlugin.cxx
+++ b/src/input/plugins/MmsInputPlugin.cxx
@@ -34,8 +34,8 @@ class MmsInputStream final : public ThreadInputStream {
mmsx_t *mms;
public:
- MmsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond)
- :ThreadInputStream(input_plugin_mms.name, _uri, _mutex, _cond,
+ MmsInputStream(const char *_uri, Mutex &_mutex)
+ :ThreadInputStream(input_plugin_mms.name, _uri, _mutex,
MMS_BUFFER_SIZE) {
}
@@ -70,7 +70,7 @@ MmsInputStream::Open()
static InputStreamPtr
input_mms_open(const char *url,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
if (!StringStartsWith(url, "mms://") &&
!StringStartsWith(url, "mmsh://") &&
@@ -78,7 +78,7 @@ input_mms_open(const char *url,
!StringStartsWith(url, "mmsu://"))
return nullptr;
- auto m = std::make_unique<MmsInputStream>(url, mutex, cond);
+ auto m = std::make_unique<MmsInputStream>(url, mutex);
m->Start();
return m;
}
diff --git a/src/input/plugins/NfsInputPlugin.cxx b/src/input/plugins/NfsInputPlugin.cxx
index ffee929b6..35ee78f4c 100644
--- a/src/input/plugins/NfsInputPlugin.cxx
+++ b/src/input/plugins/NfsInputPlugin.cxx
@@ -23,7 +23,6 @@
#include "../InputPlugin.hxx"
#include "lib/nfs/Glue.hxx"
#include "lib/nfs/FileReader.hxx"
-#include "thread/Cond.hxx"
#include "util/StringCompare.hxx"
#include <string.h>
@@ -46,9 +45,9 @@ class NfsInputStream final : NfsFileReader, public AsyncInputStream {
bool reconnect_on_resume = false, reconnecting = false;
public:
- NfsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond)
+ NfsInputStream(const char *_uri, Mutex &_mutex)
:AsyncInputStream(NfsFileReader::GetEventLoop(),
- _uri, _mutex, _cond,
+ _uri, _mutex,
NFS_MAX_BUFFERED,
NFS_RESUME_AT) {}
@@ -100,7 +99,7 @@ NfsInputStream::DoRead()
NfsFileReader::Read(next_offset, nbytes);
} catch (...) {
postponed_exception = std::current_exception();
- cond.broadcast();
+ InvokeOnAvailable();
}
}
@@ -196,7 +195,7 @@ NfsInputStream::OnNfsFileError(std::exception_ptr &&e) noexcept
else if (!IsReady())
SetReady();
else
- cond.broadcast();
+ InvokeOnAvailable();
}
/*
@@ -218,12 +217,12 @@ input_nfs_finish() noexcept
static InputStreamPtr
input_nfs_open(const char *uri,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
if (!StringStartsWith(uri, "nfs://"))
return nullptr;
- auto is = std::make_unique<NfsInputStream>(uri, mutex, cond);
+ auto is = std::make_unique<NfsInputStream>(uri, mutex);
is->Open();
return is;
}
diff --git a/src/input/plugins/QobuzInputPlugin.cxx b/src/input/plugins/QobuzInputPlugin.cxx
index e5c4f6d64..44bc5819b 100644
--- a/src/input/plugins/QobuzInputPlugin.cxx
+++ b/src/input/plugins/QobuzInputPlugin.cxx
@@ -49,8 +49,8 @@ class QobuzInputStream final
public:
QobuzInputStream(const char *_uri, const char *_track_id,
- Mutex &_mutex, Cond &_cond) noexcept
- :ProxyInputStream(_uri, _mutex, _cond),
+ Mutex &_mutex) noexcept
+ :ProxyInputStream(_uri, _mutex),
track_id(_track_id)
{
qobuz_client->AddLoginHandler(*this);
@@ -70,7 +70,7 @@ public:
private:
void Failed(std::exception_ptr e) {
SetInput(std::make_unique<FailingInputStream>(GetURI(), e,
- mutex, cond));
+ mutex));
}
/* virtual methods from QobuzSessionHandler */
@@ -89,11 +89,11 @@ QobuzInputStream::OnQobuzSession() noexcept
try {
const auto session = qobuz_client->GetSession();
- QobuzTrackHandler &handler = *this;
+ QobuzTrackHandler &h = *this;
track_request = std::make_unique<QobuzTrackRequest>(*qobuz_client,
session,
track_id.c_str(),
- handler);
+ h);
track_request->Start();
} catch (...) {
Failed(std::current_exception());
@@ -108,7 +108,7 @@ QobuzInputStream::OnQobuzTrackSuccess(std::string url) noexcept
try {
SetInput(OpenCurlInputStream(url.c_str(), {},
- mutex, cond));
+ mutex));
} catch (...) {
Failed(std::current_exception());
}
@@ -180,7 +180,7 @@ ExtractQobuzTrackId(const char *uri)
}
static InputStreamPtr
-OpenQobuzInput(const char *uri, Mutex &mutex, Cond &cond)
+OpenQobuzInput(const char *uri, Mutex &mutex)
{
assert(qobuz_client != nullptr);
@@ -190,7 +190,7 @@ OpenQobuzInput(const char *uri, Mutex &mutex, Cond &cond)
// TODO: validate track_id
- return std::make_unique<QobuzInputStream>(uri, track_id, mutex, cond);
+ return std::make_unique<QobuzInputStream>(uri, track_id, mutex);
}
static std::unique_ptr<RemoteTagScanner>
diff --git a/src/input/plugins/SmbclientInputPlugin.cxx b/src/input/plugins/SmbclientInputPlugin.cxx
index 517c29c2b..00b421ee4 100644
--- a/src/input/plugins/SmbclientInputPlugin.cxx
+++ b/src/input/plugins/SmbclientInputPlugin.cxx
@@ -37,9 +37,9 @@ class SmbclientInputStream final : public InputStream {
public:
SmbclientInputStream(const char *_uri,
- Mutex &_mutex, Cond &_cond,
+ Mutex &_mutex,
SMBCCTX *_ctx, int _fd, const struct stat &st)
- :InputStream(_uri, _mutex, _cond),
+ :InputStream(_uri, _mutex),
ctx(_ctx), fd(_fd) {
seekable = true;
size = st.st_size;
@@ -85,7 +85,7 @@ input_smbclient_init(EventLoop &, const ConfigBlock &)
static InputStreamPtr
input_smbclient_open(const char *uri,
- Mutex &mutex, Cond &cond)
+ Mutex &mutex)
{
if (!StringStartsWith(uri, "smb://"))
return nullptr;
@@ -119,7 +119,7 @@ input_smbclient_open(const char *uri,
throw MakeErrno(e, "smbc_fstat() failed");
}
- return std::make_unique<SmbclientInputStream>(uri, mutex, cond,
+ return std::make_unique<SmbclientInputStream>(uri, mutex,
ctx, fd, st);
}
diff --git a/src/input/plugins/TidalInputPlugin.cxx b/src/input/plugins/TidalInputPlugin.cxx
index cf10bbea6..9f28cdbda 100644
--- a/src/input/plugins/TidalInputPlugin.cxx
+++ b/src/input/plugins/TidalInputPlugin.cxx
@@ -60,8 +60,8 @@ class TidalInputStream final
public:
TidalInputStream(const char *_uri, const char *_track_id,
- Mutex &_mutex, Cond &_cond) noexcept
- :ProxyInputStream(_uri, _mutex, _cond),
+ Mutex &_mutex) noexcept
+ :ProxyInputStream(_uri, _mutex),
track_id(_track_id)
{
tidal_session->AddLoginHandler(*this);
@@ -81,7 +81,7 @@ public:
private:
void Failed(std::exception_ptr e) {
SetInput(std::make_unique<FailingInputStream>(GetURI(), e,
- mutex, cond));
+ mutex));
}
/* virtual methods from TidalSessionHandler */
@@ -98,14 +98,14 @@ TidalInputStream::OnTidalSession() noexcept
const std::lock_guard<Mutex> protect(mutex);
try {
- TidalTrackHandler &handler = *this;
+ TidalTrackHandler &h = *this;
track_request = std::make_unique<TidalTrackRequest>(tidal_session->GetCurl(),
tidal_session->GetBaseUrl(),
tidal_session->GetToken(),
tidal_session->GetSession().c_str(),
track_id.c_str(),
tidal_audioquality,
- handler);
+ h);
track_request->Start();
} catch (...) {
Failed(std::current_exception());
@@ -124,7 +124,7 @@ TidalInputStream::OnTidalTrackSuccess(std::string url) noexcept
try {
SetInput(OpenCurlInputStream(url.c_str(), {},
- mutex, cond));
+ mutex));
} catch (...) {
Failed(std::current_exception());
}
@@ -211,7 +211,7 @@ ExtractTidalTrackId(const char *uri)
}
static InputStreamPtr
-OpenTidalInput(const char *uri, Mutex &mutex, Cond &cond)
+OpenTidalInput(const char *uri, Mutex &mutex)
{
assert(tidal_session != nullptr);
@@ -221,7 +221,7 @@ OpenTidalInput(const char *uri, Mutex &mutex, Cond &cond)
// TODO: validate track_id
- return std::make_unique<TidalInputStream>(uri, track_id, mutex, cond);
+ return std::make_unique<TidalInputStream>(uri, track_id, mutex);
}
static std::unique_ptr<RemoteTagScanner>