Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2011, Google Inc. |
3 | | */ |
4 | | |
5 | | #define USE_THE_REPOSITORY_VARIABLE |
6 | | |
7 | | #include "git-compat-util.h" |
8 | | #include "bulk-checkin.h" |
9 | | #include "environment.h" |
10 | | #include "gettext.h" |
11 | | #include "hex.h" |
12 | | #include "lockfile.h" |
13 | | #include "repository.h" |
14 | | #include "csum-file.h" |
15 | | #include "pack.h" |
16 | | #include "strbuf.h" |
17 | | #include "tmp-objdir.h" |
18 | | #include "packfile.h" |
19 | | #include "object-file.h" |
20 | | #include "object-store-ll.h" |
21 | | |
22 | | static int odb_transaction_nesting; |
23 | | |
24 | | static struct tmp_objdir *bulk_fsync_objdir; |
25 | | |
26 | | static struct bulk_checkin_packfile { |
27 | | char *pack_tmp_name; |
28 | | struct hashfile *f; |
29 | | off_t offset; |
30 | | struct pack_idx_option pack_idx_opts; |
31 | | |
32 | | struct pack_idx_entry **written; |
33 | | uint32_t alloc_written; |
34 | | uint32_t nr_written; |
35 | | } bulk_checkin_packfile; |
36 | | |
37 | | static void finish_tmp_packfile(struct strbuf *basename, |
38 | | const char *pack_tmp_name, |
39 | | struct pack_idx_entry **written_list, |
40 | | uint32_t nr_written, |
41 | | struct pack_idx_option *pack_idx_opts, |
42 | | unsigned char hash[]) |
43 | 0 | { |
44 | 0 | char *idx_tmp_name = NULL; |
45 | |
|
46 | 0 | stage_tmp_packfiles(basename, pack_tmp_name, written_list, nr_written, |
47 | 0 | NULL, pack_idx_opts, hash, &idx_tmp_name); |
48 | 0 | rename_tmp_packfile_idx(basename, &idx_tmp_name); |
49 | |
|
50 | 0 | free(idx_tmp_name); |
51 | 0 | } |
52 | | |
53 | | static void flush_bulk_checkin_packfile(struct bulk_checkin_packfile *state) |
54 | 0 | { |
55 | 0 | unsigned char hash[GIT_MAX_RAWSZ]; |
56 | 0 | struct strbuf packname = STRBUF_INIT; |
57 | 0 | int i; |
58 | |
|
59 | 0 | if (!state->f) |
60 | 0 | return; |
61 | | |
62 | 0 | if (state->nr_written == 0) { |
63 | 0 | close(state->f->fd); |
64 | 0 | free_hashfile(state->f); |
65 | 0 | unlink(state->pack_tmp_name); |
66 | 0 | goto clear_exit; |
67 | 0 | } else if (state->nr_written == 1) { |
68 | 0 | finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK, |
69 | 0 | CSUM_HASH_IN_STREAM | CSUM_FSYNC | CSUM_CLOSE); |
70 | 0 | } else { |
71 | 0 | int fd = finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK, 0); |
72 | 0 | fixup_pack_header_footer(fd, hash, state->pack_tmp_name, |
73 | 0 | state->nr_written, hash, |
74 | 0 | state->offset); |
75 | 0 | close(fd); |
76 | 0 | } |
77 | | |
78 | 0 | strbuf_addf(&packname, "%s/pack/pack-%s.", get_object_directory(), |
79 | 0 | hash_to_hex(hash)); |
80 | 0 | finish_tmp_packfile(&packname, state->pack_tmp_name, |
81 | 0 | state->written, state->nr_written, |
82 | 0 | &state->pack_idx_opts, hash); |
83 | 0 | for (i = 0; i < state->nr_written; i++) |
84 | 0 | free(state->written[i]); |
85 | |
|
86 | 0 | clear_exit: |
87 | 0 | free(state->pack_tmp_name); |
88 | 0 | free(state->written); |
89 | 0 | memset(state, 0, sizeof(*state)); |
90 | |
|
91 | 0 | strbuf_release(&packname); |
92 | | /* Make objects we just wrote available to ourselves */ |
93 | 0 | reprepare_packed_git(the_repository); |
94 | 0 | } |
95 | | |
96 | | /* |
97 | | * Cleanup after batch-mode fsync_object_files. |
98 | | */ |
99 | | static void flush_batch_fsync(void) |
100 | 0 | { |
101 | 0 | struct strbuf temp_path = STRBUF_INIT; |
102 | 0 | struct tempfile *temp; |
103 | |
|
104 | 0 | if (!bulk_fsync_objdir) |
105 | 0 | return; |
106 | | |
107 | | /* |
108 | | * Issue a full hardware flush against a temporary file to ensure |
109 | | * that all objects are durable before any renames occur. The code in |
110 | | * fsync_loose_object_bulk_checkin has already issued a writeout |
111 | | * request, but it has not flushed any writeback cache in the storage |
112 | | * hardware or any filesystem logs. This fsync call acts as a barrier |
113 | | * to ensure that the data in each new object file is durable before |
114 | | * the final name is visible. |
115 | | */ |
116 | 0 | strbuf_addf(&temp_path, "%s/bulk_fsync_XXXXXX", get_object_directory()); |
117 | 0 | temp = xmks_tempfile(temp_path.buf); |
118 | 0 | fsync_or_die(get_tempfile_fd(temp), get_tempfile_path(temp)); |
119 | 0 | delete_tempfile(&temp); |
120 | 0 | strbuf_release(&temp_path); |
121 | | |
122 | | /* |
123 | | * Make the object files visible in the primary ODB after their data is |
124 | | * fully durable. |
125 | | */ |
126 | 0 | tmp_objdir_migrate(bulk_fsync_objdir); |
127 | 0 | bulk_fsync_objdir = NULL; |
128 | 0 | } |
129 | | |
130 | | static int already_written(struct bulk_checkin_packfile *state, struct object_id *oid) |
131 | 0 | { |
132 | 0 | int i; |
133 | | |
134 | | /* The object may already exist in the repository */ |
135 | 0 | if (repo_has_object_file(the_repository, oid)) |
136 | 0 | return 1; |
137 | | |
138 | | /* Might want to keep the list sorted */ |
139 | 0 | for (i = 0; i < state->nr_written; i++) |
140 | 0 | if (oideq(&state->written[i]->oid, oid)) |
141 | 0 | return 1; |
142 | | |
143 | | /* This is a new object we need to keep */ |
144 | 0 | return 0; |
145 | 0 | } |
146 | | |
147 | | /* |
148 | | * Read the contents from fd for size bytes, streaming it to the |
149 | | * packfile in state while updating the hash in ctx. Signal a failure |
150 | | * by returning a negative value when the resulting pack would exceed |
151 | | * the pack size limit and this is not the first object in the pack, |
152 | | * so that the caller can discard what we wrote from the current pack |
153 | | * by truncating it and opening a new one. The caller will then call |
154 | | * us again after rewinding the input fd. |
155 | | * |
156 | | * The already_hashed_to pointer is kept untouched by the caller to |
157 | | * make sure we do not hash the same byte when we are called |
158 | | * again. This way, the caller does not have to checkpoint its hash |
159 | | * status before calling us just in case we ask it to call us again |
160 | | * with a new pack. |
161 | | */ |
162 | | static int stream_blob_to_pack(struct bulk_checkin_packfile *state, |
163 | | git_hash_ctx *ctx, off_t *already_hashed_to, |
164 | | int fd, size_t size, const char *path, |
165 | | unsigned flags) |
166 | 0 | { |
167 | 0 | git_zstream s; |
168 | 0 | unsigned char ibuf[16384]; |
169 | 0 | unsigned char obuf[16384]; |
170 | 0 | unsigned hdrlen; |
171 | 0 | int status = Z_OK; |
172 | 0 | int write_object = (flags & HASH_WRITE_OBJECT); |
173 | 0 | off_t offset = 0; |
174 | |
|
175 | 0 | git_deflate_init(&s, pack_compression_level); |
176 | |
|
177 | 0 | hdrlen = encode_in_pack_object_header(obuf, sizeof(obuf), OBJ_BLOB, size); |
178 | 0 | s.next_out = obuf + hdrlen; |
179 | 0 | s.avail_out = sizeof(obuf) - hdrlen; |
180 | |
|
181 | 0 | while (status != Z_STREAM_END) { |
182 | 0 | if (size && !s.avail_in) { |
183 | 0 | ssize_t rsize = size < sizeof(ibuf) ? size : sizeof(ibuf); |
184 | 0 | ssize_t read_result = read_in_full(fd, ibuf, rsize); |
185 | 0 | if (read_result < 0) |
186 | 0 | die_errno("failed to read from '%s'", path); |
187 | 0 | if (read_result != rsize) |
188 | 0 | die("failed to read %d bytes from '%s'", |
189 | 0 | (int)rsize, path); |
190 | 0 | offset += rsize; |
191 | 0 | if (*already_hashed_to < offset) { |
192 | 0 | size_t hsize = offset - *already_hashed_to; |
193 | 0 | if (rsize < hsize) |
194 | 0 | hsize = rsize; |
195 | 0 | if (hsize) |
196 | 0 | the_hash_algo->update_fn(ctx, ibuf, hsize); |
197 | 0 | *already_hashed_to = offset; |
198 | 0 | } |
199 | 0 | s.next_in = ibuf; |
200 | 0 | s.avail_in = rsize; |
201 | 0 | size -= rsize; |
202 | 0 | } |
203 | | |
204 | 0 | status = git_deflate(&s, size ? 0 : Z_FINISH); |
205 | |
|
206 | 0 | if (!s.avail_out || status == Z_STREAM_END) { |
207 | 0 | if (write_object) { |
208 | 0 | size_t written = s.next_out - obuf; |
209 | | |
210 | | /* would we bust the size limit? */ |
211 | 0 | if (state->nr_written && |
212 | 0 | pack_size_limit_cfg && |
213 | 0 | pack_size_limit_cfg < state->offset + written) { |
214 | 0 | git_deflate_abort(&s); |
215 | 0 | return -1; |
216 | 0 | } |
217 | | |
218 | 0 | hashwrite(state->f, obuf, written); |
219 | 0 | state->offset += written; |
220 | 0 | } |
221 | 0 | s.next_out = obuf; |
222 | 0 | s.avail_out = sizeof(obuf); |
223 | 0 | } |
224 | | |
225 | 0 | switch (status) { |
226 | 0 | case Z_OK: |
227 | 0 | case Z_BUF_ERROR: |
228 | 0 | case Z_STREAM_END: |
229 | 0 | continue; |
230 | 0 | default: |
231 | 0 | die("unexpected deflate failure: %d", status); |
232 | 0 | } |
233 | 0 | } |
234 | 0 | git_deflate_end(&s); |
235 | 0 | return 0; |
236 | 0 | } |
237 | | |
238 | | /* Lazily create backing packfile for the state */ |
239 | | static void prepare_to_stream(struct bulk_checkin_packfile *state, |
240 | | unsigned flags) |
241 | 0 | { |
242 | 0 | if (!(flags & HASH_WRITE_OBJECT) || state->f) |
243 | 0 | return; |
244 | | |
245 | 0 | state->f = create_tmp_packfile(&state->pack_tmp_name); |
246 | 0 | reset_pack_idx_option(&state->pack_idx_opts); |
247 | | |
248 | | /* Pretend we are going to write only one object */ |
249 | 0 | state->offset = write_pack_header(state->f, 1); |
250 | 0 | if (!state->offset) |
251 | 0 | die_errno("unable to write pack header"); |
252 | 0 | } |
253 | | |
254 | | static int deflate_blob_to_pack(struct bulk_checkin_packfile *state, |
255 | | struct object_id *result_oid, |
256 | | int fd, size_t size, |
257 | | const char *path, unsigned flags) |
258 | 0 | { |
259 | 0 | off_t seekback, already_hashed_to; |
260 | 0 | git_hash_ctx ctx; |
261 | 0 | unsigned char obuf[16384]; |
262 | 0 | unsigned header_len; |
263 | 0 | struct hashfile_checkpoint checkpoint = {0}; |
264 | 0 | struct pack_idx_entry *idx = NULL; |
265 | |
|
266 | 0 | seekback = lseek(fd, 0, SEEK_CUR); |
267 | 0 | if (seekback == (off_t) -1) |
268 | 0 | return error("cannot find the current offset"); |
269 | | |
270 | 0 | header_len = format_object_header((char *)obuf, sizeof(obuf), |
271 | 0 | OBJ_BLOB, size); |
272 | 0 | the_hash_algo->init_fn(&ctx); |
273 | 0 | the_hash_algo->update_fn(&ctx, obuf, header_len); |
274 | 0 | the_hash_algo->init_fn(&checkpoint.ctx); |
275 | | |
276 | | /* Note: idx is non-NULL when we are writing */ |
277 | 0 | if ((flags & HASH_WRITE_OBJECT) != 0) |
278 | 0 | CALLOC_ARRAY(idx, 1); |
279 | |
|
280 | 0 | already_hashed_to = 0; |
281 | |
|
282 | 0 | while (1) { |
283 | 0 | prepare_to_stream(state, flags); |
284 | 0 | if (idx) { |
285 | 0 | hashfile_checkpoint(state->f, &checkpoint); |
286 | 0 | idx->offset = state->offset; |
287 | 0 | crc32_begin(state->f); |
288 | 0 | } |
289 | 0 | if (!stream_blob_to_pack(state, &ctx, &already_hashed_to, |
290 | 0 | fd, size, path, flags)) |
291 | 0 | break; |
292 | | /* |
293 | | * Writing this object to the current pack will make |
294 | | * it too big; we need to truncate it, start a new |
295 | | * pack, and write into it. |
296 | | */ |
297 | 0 | if (!idx) |
298 | 0 | BUG("should not happen"); |
299 | 0 | hashfile_truncate(state->f, &checkpoint); |
300 | 0 | state->offset = checkpoint.offset; |
301 | 0 | flush_bulk_checkin_packfile(state); |
302 | 0 | if (lseek(fd, seekback, SEEK_SET) == (off_t) -1) |
303 | 0 | return error("cannot seek back"); |
304 | 0 | } |
305 | 0 | the_hash_algo->final_oid_fn(result_oid, &ctx); |
306 | 0 | if (!idx) |
307 | 0 | return 0; |
308 | | |
309 | 0 | idx->crc32 = crc32_end(state->f); |
310 | 0 | if (already_written(state, result_oid)) { |
311 | 0 | hashfile_truncate(state->f, &checkpoint); |
312 | 0 | state->offset = checkpoint.offset; |
313 | 0 | free(idx); |
314 | 0 | } else { |
315 | 0 | oidcpy(&idx->oid, result_oid); |
316 | 0 | ALLOC_GROW(state->written, |
317 | 0 | state->nr_written + 1, |
318 | 0 | state->alloc_written); |
319 | 0 | state->written[state->nr_written++] = idx; |
320 | 0 | } |
321 | 0 | return 0; |
322 | 0 | } |
323 | | |
324 | | void prepare_loose_object_bulk_checkin(void) |
325 | 0 | { |
326 | | /* |
327 | | * We lazily create the temporary object directory |
328 | | * the first time an object might be added, since |
329 | | * callers may not know whether any objects will be |
330 | | * added at the time they call begin_odb_transaction. |
331 | | */ |
332 | 0 | if (!odb_transaction_nesting || bulk_fsync_objdir) |
333 | 0 | return; |
334 | | |
335 | 0 | bulk_fsync_objdir = tmp_objdir_create("bulk-fsync"); |
336 | 0 | if (bulk_fsync_objdir) |
337 | 0 | tmp_objdir_replace_primary_odb(bulk_fsync_objdir, 0); |
338 | 0 | } |
339 | | |
340 | | void fsync_loose_object_bulk_checkin(int fd, const char *filename) |
341 | 0 | { |
342 | | /* |
343 | | * If we have an active ODB transaction, we issue a call that |
344 | | * cleans the filesystem page cache but avoids a hardware flush |
345 | | * command. Later on we will issue a single hardware flush |
346 | | * before renaming the objects to their final names as part of |
347 | | * flush_batch_fsync. |
348 | | */ |
349 | 0 | if (!bulk_fsync_objdir || |
350 | 0 | git_fsync(fd, FSYNC_WRITEOUT_ONLY) < 0) { |
351 | 0 | if (errno == ENOSYS) |
352 | 0 | warning(_("core.fsyncMethod = batch is unsupported on this platform")); |
353 | 0 | fsync_or_die(fd, filename); |
354 | 0 | } |
355 | 0 | } |
356 | | |
357 | | int index_blob_bulk_checkin(struct object_id *oid, |
358 | | int fd, size_t size, |
359 | | const char *path, unsigned flags) |
360 | 0 | { |
361 | 0 | int status = deflate_blob_to_pack(&bulk_checkin_packfile, oid, fd, size, |
362 | 0 | path, flags); |
363 | 0 | if (!odb_transaction_nesting) |
364 | 0 | flush_bulk_checkin_packfile(&bulk_checkin_packfile); |
365 | 0 | return status; |
366 | 0 | } |
367 | | |
368 | | void begin_odb_transaction(void) |
369 | 0 | { |
370 | 0 | odb_transaction_nesting += 1; |
371 | 0 | } |
372 | | |
373 | | void flush_odb_transaction(void) |
374 | 0 | { |
375 | 0 | flush_batch_fsync(); |
376 | 0 | flush_bulk_checkin_packfile(&bulk_checkin_packfile); |
377 | 0 | } |
378 | | |
379 | | void end_odb_transaction(void) |
380 | 0 | { |
381 | 0 | odb_transaction_nesting -= 1; |
382 | 0 | if (odb_transaction_nesting < 0) |
383 | 0 | BUG("Unbalanced ODB transaction nesting"); |
384 | | |
385 | 0 | if (odb_transaction_nesting) |
386 | 0 | return; |
387 | | |
388 | 0 | flush_odb_transaction(); |
389 | 0 | } |