summaryrefslogtreecommitdiff
path: root/src/input/AsyncInputStream.hxx
blob: c4bc1aca4f4bf4e5b43cc44959ad8b7ed97dd8d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/*
 * Copyright 2003-2017 The Music Player Daemon Project
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifndef MPD_ASYNC_INPUT_STREAM_HXX
#define MPD_ASYNC_INPUT_STREAM_HXX

#include "InputStream.hxx"
#include "event/DeferEvent.hxx"
#include "util/HugeAllocator.hxx"
#include "util/CircularBuffer.hxx"

#include <exception>

/**
 * Helper class for moving asynchronous (non-blocking) InputStream
 * implementations to the I/O thread.  Data is being read into a ring
 * buffer, and that buffer is then consumed by another thread using
 * the regular #InputStream API.
 */
class AsyncInputStream : public InputStream {
	enum class SeekState : uint8_t {
		NONE, SCHEDULED, PENDING
	};

	DeferEvent deferred_resume;
	DeferEvent deferred_seek;

	HugeArray<uint8_t> allocation;

	CircularBuffer<uint8_t> buffer;
	const size_t resume_at;

	bool open = true;

	/**
	 * Is the connection currently paused?  That happens when the
	 * buffer was getting too large.  It will be unpaused when the
	 * buffer is below the threshold again.
	 */
	bool paused = false;

	SeekState seek_state = SeekState::NONE;

	/**
	 * The #Tag object ready to be requested via
	 * InputStream::ReadTag().
	 */
	std::unique_ptr<Tag> tag;

	offset_type seek_offset;

protected:
	std::exception_ptr postponed_exception;

public:
	AsyncInputStream(EventLoop &event_loop, const char *_url,
			 Mutex &_mutex,
			 size_t _buffer_size,
			 size_t _resume_at);

	virtual ~AsyncInputStream();

	EventLoop &GetEventLoop() {
		return deferred_resume.GetEventLoop();
	}

	/* virtual methods from InputStream */
	void Check() final;
	bool IsEOF() noexcept final;
	void Seek(offset_type new_offset) final;
	std::unique_ptr<Tag> ReadTag() final;
	bool IsAvailable() noexcept final;
	size_t Read(void *ptr, size_t read_size) final;

protected:
	/**
	 * Pass an tag from the I/O thread to the client thread.
	 */
	void SetTag(std::unique_ptr<Tag> _tag) noexcept;
	void ClearTag() noexcept;

	void Pause() noexcept;

	bool IsPaused() const noexcept {
		return paused;
	}

	/**
	 * Declare that the underlying stream was closed.  We will
	 * continue feeding Read() calls from the buffer until it runs
	 * empty.
	 */
	void SetClosed() noexcept {
		open = false;
	}

	bool IsBufferEmpty() const noexcept {
		return buffer.empty();
	}

	bool IsBufferFull() const noexcept {
		return buffer.IsFull();
	}

	/**
	 * Determine how many bytes can be added to the buffer.
	 */
	gcc_pure
	size_t GetBufferSpace() const noexcept {
		return buffer.GetSpace();
	}

	CircularBuffer<uint8_t>::Range PrepareWriteBuffer() noexcept {
		return buffer.Write();
	}

	void CommitWriteBuffer(size_t nbytes) noexcept;

	/**
	 * Append data to the buffer.  The size must fit into the
	 * buffer; see GetBufferSpace().
	 */
	void AppendToBuffer(const void *data, size_t append_size) noexcept;

	/**
	 * Implement code here that will resume the stream after it
	 * has been paused due to full input buffer.
	 */
	virtual void DoResume() = 0;

	/**
	 * The actual Seek() implementation.  This virtual method will
	 * be called from within the I/O thread.  When the operation
	 * is finished, call SeekDone() to notify the caller.
	 */
	virtual void DoSeek(offset_type new_offset) = 0;

	bool IsSeekPending() const noexcept {
		return seek_state == SeekState::PENDING;
	}

	/**
	 * Call this after seeking has finished.  It will notify the
	 * client thread.
	 */
	void SeekDone() noexcept;

private:
	void Resume();

	/* for DeferEvent */
	void DeferredResume() noexcept;
	void DeferredSeek() noexcept;
};

#endif