Skip to content

Commit 33ab258

Browse files
authored
Merge pull request #140 from beef9999/main
Re-add prefetch
2 parents 718968b + c0e3d8c commit 33ab258

File tree

2 files changed

+406
-0
lines changed

2 files changed

+406
-0
lines changed

src/prefetch.cpp

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
/*
2+
Copyright The Overlaybd Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
#include <memory>
17+
#include <vector>
18+
#include <map>
19+
#include <queue>
20+
#include <unistd.h>
21+
#include <fcntl.h>
22+
#include <sys/stat.h>
23+
#include <sys/mman.h>
24+
25+
#include "prefetch.h"
26+
#include <photon/common/alog.h>
27+
#include <photon/common/alog-stdstring.h>
28+
#include <photon/fs/forwardfs.h>
29+
#include <photon/fs/localfs.h>
30+
#include <photon/thread/thread11.h>
31+
#include "overlaybd/zfile/crc32/crc32c.h"
32+
33+
using namespace std;
34+
35+
class PrefetcherImpl;
36+
37+
class PrefetchFile : public ForwardFile_Ownership {
38+
public:
39+
PrefetchFile(IFile *src_file, uint32_t layer_index, Prefetcher *prefetcher);
40+
41+
ssize_t pread(void *buf, size_t count, off_t offset) override;
42+
43+
private:
44+
uint32_t m_layer_index;
45+
PrefetcherImpl *m_prefetcher;
46+
};
47+
48+
class PrefetcherImpl : public Prefetcher {
49+
public:
50+
explicit PrefetcherImpl(const string &trace_file_path) {
51+
// Detect mode
52+
size_t file_size = 0;
53+
m_mode = detect_mode(trace_file_path, &file_size);
54+
m_lock_file_path = trace_file_path + ".lock";
55+
m_ok_file_path = trace_file_path + ".ok";
56+
LOG_INFO("Prefetch: run with mode `, trace file is `", m_mode, trace_file_path);
57+
58+
// Open trace file
59+
if (m_mode != Mode::Disabled) {
60+
int flags = m_mode == Mode::Record ? O_WRONLY : O_RDONLY;
61+
m_trace_file =
62+
open_localfile_adaptor(trace_file_path.c_str(), flags, 0666);
63+
}
64+
65+
// Loop detect lock file if going to record
66+
if (m_mode == Mode::Record) {
67+
int lock_fd = open(m_lock_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_EXCL, 0666);
68+
close(lock_fd);
69+
auto th = photon::thread_create11(&PrefetcherImpl::detect_lock, this);
70+
m_detect_thread = photon::thread_enable_join(th);
71+
}
72+
73+
// Reload if going to replay
74+
if (m_mode == Mode::Replay) {
75+
reload(file_size);
76+
}
77+
}
78+
79+
~PrefetcherImpl() {
80+
if (m_mode == Mode::Record) {
81+
m_record_stopped = true;
82+
if (m_detect_thread_interruptible) {
83+
photon::thread_shutdown((photon::thread *)m_detect_thread);
84+
}
85+
photon::thread_join(m_detect_thread);
86+
dump();
87+
88+
} else if (m_mode == Mode::Replay) {
89+
m_replay_stopped = true;
90+
for (auto th : m_replay_threads) {
91+
photon::thread_shutdown((photon::thread *)th);
92+
photon::thread_join(th);
93+
}
94+
}
95+
96+
if (m_trace_file != nullptr) {
97+
m_trace_file->close();
98+
m_trace_file = nullptr;
99+
}
100+
}
101+
102+
IFile *new_prefetch_file(IFile *src_file, uint32_t layer_index) override {
103+
return new PrefetchFile(src_file, layer_index, this);
104+
}
105+
106+
void record(TraceOp op, uint32_t layer_index, size_t count, off_t offset) override {
107+
if (m_record_stopped) {
108+
return;
109+
}
110+
TraceFormat trace = {op, layer_index, count, offset};
111+
m_record_array.push_back(trace);
112+
}
113+
114+
void replay() override {
115+
if (m_mode != Mode::Replay) {
116+
return;
117+
}
118+
if (m_replay_queue.empty() || m_src_files.empty()) {
119+
return;
120+
}
121+
LOG_INFO("Prefetch: Replay ` records from ` layers", m_replay_queue.size(),
122+
m_src_files.size());
123+
for (int i = 0; i < REPLAY_CONCURRENCY; ++i) {
124+
auto th = photon::thread_create11(&PrefetcherImpl::replay_worker_thread, this);
125+
auto join_handle = photon::thread_enable_join(th);
126+
m_replay_threads.push_back(join_handle);
127+
}
128+
}
129+
130+
int replay_worker_thread() {
131+
static char buf[MAX_IO_SIZE]; // multi threads reuse one buffer
132+
while (!m_replay_queue.empty() && !m_replay_stopped) {
133+
auto trace = m_replay_queue.front();
134+
m_replay_queue.pop();
135+
auto iter = m_src_files.find(trace.layer_index);
136+
if (iter == m_src_files.end()) {
137+
continue;
138+
}
139+
auto src_file = iter->second;
140+
if (trace.op == PrefetcherImpl::TraceOp::READ) {
141+
ssize_t n_read = src_file->pread(buf, trace.count, trace.offset);
142+
if (n_read != (ssize_t)trace.count) {
143+
LOG_ERROR("Prefetch: replay pread failed: `, `, respect: `, got: `", ERRNO(),
144+
trace, trace.count, n_read);
145+
continue;
146+
}
147+
}
148+
}
149+
photon::thread_sleep(3);
150+
if (!m_buffer_released) {
151+
m_buffer_released = true;
152+
madvise(buf, MAX_IO_SIZE, MADV_DONTNEED);
153+
}
154+
return 0;
155+
}
156+
157+
void register_src_file(uint32_t layer_index, IFile *src_file) {
158+
m_src_files[layer_index] = src_file;
159+
}
160+
161+
private:
162+
struct TraceFormat {
163+
TraceOp op;
164+
uint32_t layer_index;
165+
size_t count;
166+
off_t offset;
167+
};
168+
169+
struct TraceHeader {
170+
uint32_t magic = 0;
171+
size_t data_size = 0;
172+
uint32_t checksum = 0;
173+
};
174+
175+
static const int MAX_IO_SIZE = 1024 * 1024;
176+
static const int REPLAY_CONCURRENCY = 16;
177+
static const uint32_t TRACE_MAGIC = 3270449184; // CRC32 of `Container Image Trace Format`
178+
179+
vector<TraceFormat> m_record_array;
180+
queue<TraceFormat> m_replay_queue;
181+
map<uint32_t, IFile *> m_src_files;
182+
vector<photon::join_handle *> m_replay_threads;
183+
photon::join_handle *m_detect_thread = nullptr;
184+
bool m_detect_thread_interruptible = false;
185+
string m_lock_file_path;
186+
string m_ok_file_path;
187+
IFile *m_trace_file = nullptr;
188+
bool m_replay_stopped = false;
189+
bool m_record_stopped = false;
190+
bool m_buffer_released = false;
191+
192+
int dump() {
193+
if (m_trace_file == nullptr) {
194+
return 0;
195+
}
196+
197+
if (access(m_ok_file_path.c_str(), F_OK) != 0) {
198+
unlink(m_ok_file_path.c_str());
199+
}
200+
201+
auto close_trace_file = [&]() {
202+
if (m_trace_file != nullptr) {
203+
m_trace_file->close();
204+
m_trace_file = nullptr;
205+
}
206+
};
207+
DEFER(close_trace_file());
208+
209+
TraceHeader hdr = {};
210+
hdr.magic = TRACE_MAGIC;
211+
hdr.checksum = 0; // calculate and re-write checksum later
212+
hdr.data_size = sizeof(TraceFormat) * m_record_array.size();
213+
214+
ssize_t n_written = m_trace_file->write(&hdr, sizeof(TraceHeader));
215+
if (n_written != sizeof(TraceHeader)) {
216+
m_trace_file->ftruncate(0);
217+
LOG_ERRNO_RETURN(0, -1, "Prefetch: dump write header failed");
218+
}
219+
220+
for (auto &each : m_record_array) {
221+
hdr.checksum = crc32::crc32c_extend(&each, sizeof(TraceFormat), hdr.checksum);
222+
n_written = m_trace_file->write(&each, sizeof(TraceFormat));
223+
if (n_written != sizeof(TraceFormat)) {
224+
m_trace_file->ftruncate(0);
225+
LOG_ERRNO_RETURN(0, -1, "Prefetch: dump write content failed");
226+
}
227+
}
228+
229+
n_written = m_trace_file->pwrite(&hdr, sizeof(TraceHeader), 0);
230+
if (n_written != sizeof(TraceHeader)) {
231+
m_trace_file->ftruncate(0);
232+
LOG_ERRNO_RETURN(0, -1, "Prefetch: dump write header(checksum) failed");
233+
}
234+
235+
unlink(m_lock_file_path.c_str());
236+
237+
int ok_fd = open(m_ok_file_path.c_str(), O_RDWR | O_CREAT | O_TRUNC | O_EXCL, 0666);
238+
if (ok_fd < 0) {
239+
LOG_ERRNO_RETURN(0, -1, "Prefetch: open OK file failed");
240+
}
241+
close(ok_fd);
242+
return 0;
243+
}
244+
245+
int reload(size_t trace_file_size) {
246+
// Reload header
247+
TraceHeader hdr = {};
248+
ssize_t n_read = m_trace_file->read(&hdr, sizeof(TraceHeader));
249+
if (n_read != sizeof(TraceHeader)) {
250+
LOG_ERRNO_RETURN(0, -1, "Prefetch: reload header failed");
251+
}
252+
if (TRACE_MAGIC != hdr.magic) {
253+
LOG_ERROR_RETURN(0, -1, "Prefetch: trace magic mismatch");
254+
}
255+
if (trace_file_size != hdr.data_size + sizeof(TraceHeader)) {
256+
LOG_ERROR_RETURN(0, -1, "Prefetch: trace file size mismatch");
257+
}
258+
259+
// Reload content
260+
uint32_t checksum = 0;
261+
TraceFormat fmt = {};
262+
for (int i = 0; i < hdr.data_size / sizeof(TraceFormat); ++i) {
263+
n_read = m_trace_file->read(&fmt, sizeof(TraceFormat));
264+
if (n_read != sizeof(TraceFormat)) {
265+
LOG_ERRNO_RETURN(0, -1, "Prefetch: reload content failed");
266+
}
267+
checksum = crc32::crc32c_extend(&fmt, sizeof(TraceFormat), checksum);
268+
// Save in memory
269+
m_replay_queue.push(fmt);
270+
}
271+
272+
if (checksum != hdr.checksum) {
273+
queue<TraceFormat> tmp;
274+
m_replay_queue.swap(tmp);
275+
LOG_ERROR_RETURN(0, -1, "Prefetch: reload checksum error");
276+
}
277+
278+
LOG_INFO("Prefetch: Reload ` records", m_replay_queue.size());
279+
return 0;
280+
}
281+
282+
int detect_lock() {
283+
while (!m_record_stopped) {
284+
m_detect_thread_interruptible = true;
285+
int ret = photon::thread_sleep(1);
286+
m_detect_thread_interruptible = false;
287+
if (ret != 0) {
288+
break;
289+
}
290+
if (access(m_lock_file_path.c_str(), F_OK) != 0) {
291+
m_record_stopped = true;
292+
dump();
293+
break;
294+
}
295+
}
296+
return 0;
297+
}
298+
299+
friend LogBuffer &operator<<(LogBuffer &log, const PrefetcherImpl::TraceFormat &f);
300+
};
301+
302+
LogBuffer &operator<<(LogBuffer &log, const PrefetcherImpl::TraceFormat &f) {
303+
return log << "Op " << char(f.op) << ", Count " << f.count << ", Offset " << f.offset
304+
<< ", Layer_index " << f.layer_index;
305+
}
306+
307+
PrefetchFile::PrefetchFile(IFile *src_file, uint32_t layer_index, Prefetcher *prefetcher)
308+
: ForwardFile_Ownership(src_file, true), m_layer_index(layer_index),
309+
m_prefetcher((PrefetcherImpl *)prefetcher) {
310+
if (m_prefetcher->get_mode() == PrefetcherImpl::Mode::Replay) {
311+
m_prefetcher->register_src_file(layer_index, src_file);
312+
}
313+
}
314+
315+
ssize_t PrefetchFile::pread(void *buf, size_t count, off_t offset) {
316+
ssize_t n_read = m_file->pread(buf, count, offset);
317+
if (n_read == (ssize_t)count && m_prefetcher->get_mode() == PrefetcherImpl::Mode::Record) {
318+
m_prefetcher->record(PrefetcherImpl::TraceOp::READ, m_layer_index, count, offset);
319+
}
320+
return n_read;
321+
}
322+
323+
Prefetcher *new_prefetcher(const string &trace_file_path) {
324+
return new PrefetcherImpl(trace_file_path);
325+
}
326+
327+
Prefetcher::Mode Prefetcher::detect_mode(const string &trace_file_path, size_t *file_size) {
328+
struct stat buf = {};
329+
int ret = stat(trace_file_path.c_str(), &buf);
330+
if (file_size != nullptr) {
331+
*file_size = buf.st_size;
332+
}
333+
if (ret != 0) {
334+
return Mode::Disabled;
335+
} else if (buf.st_size == 0) {
336+
return Mode::Record;
337+
} else {
338+
return Mode::Replay;
339+
}
340+
}

0 commit comments

Comments
 (0)