summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2013-12-31 16:31:36 +0100
committerMax Kellermann <max@duempel.org>2014-01-04 18:22:55 +0100
commitc9da3363a04bfdf8a0f0b2d974291e422a0345d2 (patch)
treefd79aa539d93da5c12d35f05a104a4c8e82258b9 /src
parent9bd4ed3e60df771241114ed460b8b5943ff57982 (diff)
output/httpd: move all broadcast operations to the IOThread
Add a Page queue to class HttpdOutput, and use DeferredMonitor to flush this queue inside the IOThread. This fixes a thread-safety issue: much of EventLoop is not thread-safe, and the httpd plugin ignored that problem.
Diffstat (limited to 'src')
-rw-r--r--src/output/HttpdInternal.hxx21
-rw-r--r--src/output/HttpdOutputPlugin.cxx66
2 files changed, 76 insertions, 11 deletions
diff --git a/src/output/HttpdInternal.hxx b/src/output/HttpdInternal.hxx
index df9099335..f20d88b4e 100644
--- a/src/output/HttpdInternal.hxx
+++ b/src/output/HttpdInternal.hxx
@@ -29,6 +29,7 @@
#include "Timer.hxx"
#include "thread/Mutex.hxx"
#include "event/ServerSocket.hxx"
+#include "event/DeferredMonitor.hxx"
#ifdef _LIBCPP_VERSION
/* can't use incomplete template arguments with libc++ */
@@ -36,6 +37,8 @@
#endif
#include <forward_list>
+#include <queue>
+#include <list>
struct config_param;
class Error;
@@ -46,7 +49,7 @@ class Page;
struct Encoder;
struct Tag;
-class HttpdOutput final : ServerSocket {
+class HttpdOutput final : ServerSocket, DeferredMonitor {
struct audio_output base;
/**
@@ -80,6 +83,12 @@ public:
*/
mutable Mutex mutex;
+ /**
+ * This condition gets signalled when an item is removed from
+ * #pages.
+ */
+ Cond cond;
+
private:
/**
* A #Timer object to synchronize this output with the
@@ -97,6 +106,14 @@ private:
*/
Page *metadata;
+ /**
+ * The page queue, i.e. pages from the encoder to be
+ * broadcasted to all clients. This container is necessary to
+ * pass pages from the OutputThread to the IOThread. It is
+ * protected by #mutex, and removing signals #cond.
+ */
+ std::queue<Page *, std::list<Page *>> pages;
+
public:
/**
* The configured name.
@@ -248,6 +265,8 @@ public:
void CancelAllClients();
private:
+ virtual void RunDeferred() override;
+
virtual void OnAccept(int fd, const sockaddr &address,
size_t address_length, int uid) override;
};
diff --git a/src/output/HttpdOutputPlugin.cxx b/src/output/HttpdOutputPlugin.cxx
index 2790dd98d..fa6934ff0 100644
--- a/src/output/HttpdOutputPlugin.cxx
+++ b/src/output/HttpdOutputPlugin.cxx
@@ -29,6 +29,7 @@
#include "IcyMetaDataServer.hxx"
#include "system/fd_util.h"
#include "IOThread.hxx"
+#include "event/Call.hxx"
#include "util/Error.hxx"
#include "util/Domain.hxx"
#include "Log.hxx"
@@ -49,7 +50,7 @@ const Domain httpd_output_domain("httpd_output");
inline
HttpdOutput::HttpdOutput(EventLoop &_loop)
- :ServerSocket(_loop),
+ :ServerSocket(_loop), DeferredMonitor(_loop),
encoder(nullptr), unflushed_input(0),
metadata(nullptr)
{
@@ -162,7 +163,7 @@ httpd_output_finish(struct audio_output *ao)
inline void
HttpdOutput::AddClient(int fd)
{
- clients.emplace_front(*this, fd, GetEventLoop(),
+ clients.emplace_front(*this, fd, ServerSocket::GetEventLoop(),
encoder->plugin.tag == nullptr);
++clients_cnt;
@@ -172,6 +173,29 @@ HttpdOutput::AddClient(int fd)
}
void
+HttpdOutput::RunDeferred()
+{
+ /* this method runs in the IOThread; it broadcasts pages from
+ our own queue to all clients */
+
+ const ScopeLock protect(mutex);
+
+ while (!pages.empty()) {
+ Page *page = pages.front();
+ pages.pop();
+
+ for (auto &client : clients)
+ client.PushPage(page);
+
+ page->Unref();
+ }
+
+ /* wake up the client that may be waiting for the queue to be
+ flushed */
+ cond.broadcast();
+}
+
+void
HttpdOutput::OnAccept(int fd, const sockaddr &address,
size_t address_length, gcc_unused int uid)
{
@@ -393,19 +417,29 @@ HttpdOutput::BroadcastPage(Page *page)
{
assert(page != nullptr);
- const ScopeLock protect(mutex);
- for (auto &client : clients)
- client.PushPage(page);
+ mutex.lock();
+ pages.push(page);
+ page->Ref();
+ mutex.unlock();
+
+ DeferredMonitor::Schedule();
}
void
HttpdOutput::BroadcastFromEncoder()
{
+ /* synchronize with the IOThread */
+ mutex.lock();
+ while (!pages.empty())
+ cond.wait(mutex);
+
Page *page;
- while ((page = ReadPage()) != nullptr) {
- BroadcastPage(page);
- page->Unref();
- }
+ while ((page = ReadPage()) != nullptr)
+ pages.push(page);
+
+ mutex.unlock();
+
+ DeferredMonitor::Schedule();
}
inline bool
@@ -519,15 +553,27 @@ inline void
HttpdOutput::CancelAllClients()
{
const ScopeLock protect(mutex);
+
+ while (!pages.empty()) {
+ Page *page = pages.front();
+ pages.pop();
+ page->Unref();
+ }
+
for (auto &client : clients)
client.CancelQueue();
+
+ cond.broadcast();
}
static void
httpd_output_cancel(struct audio_output *ao)
{
HttpdOutput *httpd = HttpdOutput::Cast(ao);
- httpd->CancelAllClients();
+
+ BlockingCall(io_thread_get(), [httpd](){
+ httpd->CancelAllClients();
+ });
}
const struct audio_output_plugin httpd_output_plugin = {