Coverage Report

Created: 2024-09-08 06:23

/src/git/bulk-checkin.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2011, Google Inc.
3
 */
4
5
#define USE_THE_REPOSITORY_VARIABLE
6
7
#include "git-compat-util.h"
8
#include "bulk-checkin.h"
9
#include "environment.h"
10
#include "gettext.h"
11
#include "hex.h"
12
#include "lockfile.h"
13
#include "repository.h"
14
#include "csum-file.h"
15
#include "pack.h"
16
#include "strbuf.h"
17
#include "tmp-objdir.h"
18
#include "packfile.h"
19
#include "object-file.h"
20
#include "object-store-ll.h"
21
22
static int odb_transaction_nesting;
23
24
static struct tmp_objdir *bulk_fsync_objdir;
25
26
static struct bulk_checkin_packfile {
27
  char *pack_tmp_name;
28
  struct hashfile *f;
29
  off_t offset;
30
  struct pack_idx_option pack_idx_opts;
31
32
  struct pack_idx_entry **written;
33
  uint32_t alloc_written;
34
  uint32_t nr_written;
35
} bulk_checkin_packfile;
36
37
static void finish_tmp_packfile(struct strbuf *basename,
38
        const char *pack_tmp_name,
39
        struct pack_idx_entry **written_list,
40
        uint32_t nr_written,
41
        struct pack_idx_option *pack_idx_opts,
42
        unsigned char hash[])
43
0
{
44
0
  char *idx_tmp_name = NULL;
45
46
0
  stage_tmp_packfiles(basename, pack_tmp_name, written_list, nr_written,
47
0
          NULL, pack_idx_opts, hash, &idx_tmp_name);
48
0
  rename_tmp_packfile_idx(basename, &idx_tmp_name);
49
50
0
  free(idx_tmp_name);
51
0
}
52
53
static void flush_bulk_checkin_packfile(struct bulk_checkin_packfile *state)
54
0
{
55
0
  unsigned char hash[GIT_MAX_RAWSZ];
56
0
  struct strbuf packname = STRBUF_INIT;
57
0
  int i;
58
59
0
  if (!state->f)
60
0
    return;
61
62
0
  if (state->nr_written == 0) {
63
0
    close(state->f->fd);
64
0
    free_hashfile(state->f);
65
0
    unlink(state->pack_tmp_name);
66
0
    goto clear_exit;
67
0
  } else if (state->nr_written == 1) {
68
0
    finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK,
69
0
          CSUM_HASH_IN_STREAM | CSUM_FSYNC | CSUM_CLOSE);
70
0
  } else {
71
0
    int fd = finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK, 0);
72
0
    fixup_pack_header_footer(fd, hash, state->pack_tmp_name,
73
0
           state->nr_written, hash,
74
0
           state->offset);
75
0
    close(fd);
76
0
  }
77
78
0
  strbuf_addf(&packname, "%s/pack/pack-%s.", get_object_directory(),
79
0
        hash_to_hex(hash));
80
0
  finish_tmp_packfile(&packname, state->pack_tmp_name,
81
0
          state->written, state->nr_written,
82
0
          &state->pack_idx_opts, hash);
83
0
  for (i = 0; i < state->nr_written; i++)
84
0
    free(state->written[i]);
85
86
0
clear_exit:
87
0
  free(state->pack_tmp_name);
88
0
  free(state->written);
89
0
  memset(state, 0, sizeof(*state));
90
91
0
  strbuf_release(&packname);
92
  /* Make objects we just wrote available to ourselves */
93
0
  reprepare_packed_git(the_repository);
94
0
}
95
96
/*
97
 * Cleanup after batch-mode fsync_object_files.
98
 */
99
static void flush_batch_fsync(void)
100
0
{
101
0
  struct strbuf temp_path = STRBUF_INIT;
102
0
  struct tempfile *temp;
103
104
0
  if (!bulk_fsync_objdir)
105
0
    return;
106
107
  /*
108
   * Issue a full hardware flush against a temporary file to ensure
109
   * that all objects are durable before any renames occur. The code in
110
   * fsync_loose_object_bulk_checkin has already issued a writeout
111
   * request, but it has not flushed any writeback cache in the storage
112
   * hardware or any filesystem logs. This fsync call acts as a barrier
113
   * to ensure that the data in each new object file is durable before
114
   * the final name is visible.
115
   */
116
0
  strbuf_addf(&temp_path, "%s/bulk_fsync_XXXXXX", get_object_directory());
117
0
  temp = xmks_tempfile(temp_path.buf);
118
0
  fsync_or_die(get_tempfile_fd(temp), get_tempfile_path(temp));
119
0
  delete_tempfile(&temp);
120
0
  strbuf_release(&temp_path);
121
122
  /*
123
   * Make the object files visible in the primary ODB after their data is
124
   * fully durable.
125
   */
126
0
  tmp_objdir_migrate(bulk_fsync_objdir);
127
0
  bulk_fsync_objdir = NULL;
128
0
}
129
130
static int already_written(struct bulk_checkin_packfile *state, struct object_id *oid)
131
0
{
132
0
  int i;
133
134
  /* The object may already exist in the repository */
135
0
  if (repo_has_object_file(the_repository, oid))
136
0
    return 1;
137
138
  /* Might want to keep the list sorted */
139
0
  for (i = 0; i < state->nr_written; i++)
140
0
    if (oideq(&state->written[i]->oid, oid))
141
0
      return 1;
142
143
  /* This is a new object we need to keep */
144
0
  return 0;
145
0
}
146
147
/*
148
 * Read the contents from fd for size bytes, streaming it to the
149
 * packfile in state while updating the hash in ctx. Signal a failure
150
 * by returning a negative value when the resulting pack would exceed
151
 * the pack size limit and this is not the first object in the pack,
152
 * so that the caller can discard what we wrote from the current pack
153
 * by truncating it and opening a new one. The caller will then call
154
 * us again after rewinding the input fd.
155
 *
156
 * The already_hashed_to pointer is kept untouched by the caller to
157
 * make sure we do not hash the same byte when we are called
158
 * again. This way, the caller does not have to checkpoint its hash
159
 * status before calling us just in case we ask it to call us again
160
 * with a new pack.
161
 */
162
static int stream_blob_to_pack(struct bulk_checkin_packfile *state,
163
             git_hash_ctx *ctx, off_t *already_hashed_to,
164
             int fd, size_t size, const char *path,
165
             unsigned flags)
166
0
{
167
0
  git_zstream s;
168
0
  unsigned char ibuf[16384];
169
0
  unsigned char obuf[16384];
170
0
  unsigned hdrlen;
171
0
  int status = Z_OK;
172
0
  int write_object = (flags & HASH_WRITE_OBJECT);
173
0
  off_t offset = 0;
174
175
0
  git_deflate_init(&s, pack_compression_level);
176
177
0
  hdrlen = encode_in_pack_object_header(obuf, sizeof(obuf), OBJ_BLOB, size);
178
0
  s.next_out = obuf + hdrlen;
179
0
  s.avail_out = sizeof(obuf) - hdrlen;
180
181
0
  while (status != Z_STREAM_END) {
182
0
    if (size && !s.avail_in) {
183
0
      ssize_t rsize = size < sizeof(ibuf) ? size : sizeof(ibuf);
184
0
      ssize_t read_result = read_in_full(fd, ibuf, rsize);
185
0
      if (read_result < 0)
186
0
        die_errno("failed to read from '%s'", path);
187
0
      if (read_result != rsize)
188
0
        die("failed to read %d bytes from '%s'",
189
0
            (int)rsize, path);
190
0
      offset += rsize;
191
0
      if (*already_hashed_to < offset) {
192
0
        size_t hsize = offset - *already_hashed_to;
193
0
        if (rsize < hsize)
194
0
          hsize = rsize;
195
0
        if (hsize)
196
0
          the_hash_algo->update_fn(ctx, ibuf, hsize);
197
0
        *already_hashed_to = offset;
198
0
      }
199
0
      s.next_in = ibuf;
200
0
      s.avail_in = rsize;
201
0
      size -= rsize;
202
0
    }
203
204
0
    status = git_deflate(&s, size ? 0 : Z_FINISH);
205
206
0
    if (!s.avail_out || status == Z_STREAM_END) {
207
0
      if (write_object) {
208
0
        size_t written = s.next_out - obuf;
209
210
        /* would we bust the size limit? */
211
0
        if (state->nr_written &&
212
0
            pack_size_limit_cfg &&
213
0
            pack_size_limit_cfg < state->offset + written) {
214
0
          git_deflate_abort(&s);
215
0
          return -1;
216
0
        }
217
218
0
        hashwrite(state->f, obuf, written);
219
0
        state->offset += written;
220
0
      }
221
0
      s.next_out = obuf;
222
0
      s.avail_out = sizeof(obuf);
223
0
    }
224
225
0
    switch (status) {
226
0
    case Z_OK:
227
0
    case Z_BUF_ERROR:
228
0
    case Z_STREAM_END:
229
0
      continue;
230
0
    default:
231
0
      die("unexpected deflate failure: %d", status);
232
0
    }
233
0
  }
234
0
  git_deflate_end(&s);
235
0
  return 0;
236
0
}
237
238
/* Lazily create backing packfile for the state */
239
static void prepare_to_stream(struct bulk_checkin_packfile *state,
240
            unsigned flags)
241
0
{
242
0
  if (!(flags & HASH_WRITE_OBJECT) || state->f)
243
0
    return;
244
245
0
  state->f = create_tmp_packfile(&state->pack_tmp_name);
246
0
  reset_pack_idx_option(&state->pack_idx_opts);
247
248
  /* Pretend we are going to write only one object */
249
0
  state->offset = write_pack_header(state->f, 1);
250
0
  if (!state->offset)
251
0
    die_errno("unable to write pack header");
252
0
}
253
254
static int deflate_blob_to_pack(struct bulk_checkin_packfile *state,
255
        struct object_id *result_oid,
256
        int fd, size_t size,
257
        const char *path, unsigned flags)
258
0
{
259
0
  off_t seekback, already_hashed_to;
260
0
  git_hash_ctx ctx;
261
0
  unsigned char obuf[16384];
262
0
  unsigned header_len;
263
0
  struct hashfile_checkpoint checkpoint = {0};
264
0
  struct pack_idx_entry *idx = NULL;
265
266
0
  seekback = lseek(fd, 0, SEEK_CUR);
267
0
  if (seekback == (off_t) -1)
268
0
    return error("cannot find the current offset");
269
270
0
  header_len = format_object_header((char *)obuf, sizeof(obuf),
271
0
            OBJ_BLOB, size);
272
0
  the_hash_algo->init_fn(&ctx);
273
0
  the_hash_algo->update_fn(&ctx, obuf, header_len);
274
0
  the_hash_algo->init_fn(&checkpoint.ctx);
275
276
  /* Note: idx is non-NULL when we are writing */
277
0
  if ((flags & HASH_WRITE_OBJECT) != 0)
278
0
    CALLOC_ARRAY(idx, 1);
279
280
0
  already_hashed_to = 0;
281
282
0
  while (1) {
283
0
    prepare_to_stream(state, flags);
284
0
    if (idx) {
285
0
      hashfile_checkpoint(state->f, &checkpoint);
286
0
      idx->offset = state->offset;
287
0
      crc32_begin(state->f);
288
0
    }
289
0
    if (!stream_blob_to_pack(state, &ctx, &already_hashed_to,
290
0
           fd, size, path, flags))
291
0
      break;
292
    /*
293
     * Writing this object to the current pack will make
294
     * it too big; we need to truncate it, start a new
295
     * pack, and write into it.
296
     */
297
0
    if (!idx)
298
0
      BUG("should not happen");
299
0
    hashfile_truncate(state->f, &checkpoint);
300
0
    state->offset = checkpoint.offset;
301
0
    flush_bulk_checkin_packfile(state);
302
0
    if (lseek(fd, seekback, SEEK_SET) == (off_t) -1)
303
0
      return error("cannot seek back");
304
0
  }
305
0
  the_hash_algo->final_oid_fn(result_oid, &ctx);
306
0
  if (!idx)
307
0
    return 0;
308
309
0
  idx->crc32 = crc32_end(state->f);
310
0
  if (already_written(state, result_oid)) {
311
0
    hashfile_truncate(state->f, &checkpoint);
312
0
    state->offset = checkpoint.offset;
313
0
    free(idx);
314
0
  } else {
315
0
    oidcpy(&idx->oid, result_oid);
316
0
    ALLOC_GROW(state->written,
317
0
         state->nr_written + 1,
318
0
         state->alloc_written);
319
0
    state->written[state->nr_written++] = idx;
320
0
  }
321
0
  return 0;
322
0
}
323
324
void prepare_loose_object_bulk_checkin(void)
325
0
{
326
  /*
327
   * We lazily create the temporary object directory
328
   * the first time an object might be added, since
329
   * callers may not know whether any objects will be
330
   * added at the time they call begin_odb_transaction.
331
   */
332
0
  if (!odb_transaction_nesting || bulk_fsync_objdir)
333
0
    return;
334
335
0
  bulk_fsync_objdir = tmp_objdir_create("bulk-fsync");
336
0
  if (bulk_fsync_objdir)
337
0
    tmp_objdir_replace_primary_odb(bulk_fsync_objdir, 0);
338
0
}
339
340
void fsync_loose_object_bulk_checkin(int fd, const char *filename)
341
0
{
342
  /*
343
   * If we have an active ODB transaction, we issue a call that
344
   * cleans the filesystem page cache but avoids a hardware flush
345
   * command. Later on we will issue a single hardware flush
346
   * before renaming the objects to their final names as part of
347
   * flush_batch_fsync.
348
   */
349
0
  if (!bulk_fsync_objdir ||
350
0
      git_fsync(fd, FSYNC_WRITEOUT_ONLY) < 0) {
351
0
    if (errno == ENOSYS)
352
0
      warning(_("core.fsyncMethod = batch is unsupported on this platform"));
353
0
    fsync_or_die(fd, filename);
354
0
  }
355
0
}
356
357
int index_blob_bulk_checkin(struct object_id *oid,
358
          int fd, size_t size,
359
          const char *path, unsigned flags)
360
0
{
361
0
  int status = deflate_blob_to_pack(&bulk_checkin_packfile, oid, fd, size,
362
0
            path, flags);
363
0
  if (!odb_transaction_nesting)
364
0
    flush_bulk_checkin_packfile(&bulk_checkin_packfile);
365
0
  return status;
366
0
}
367
368
void begin_odb_transaction(void)
369
0
{
370
0
  odb_transaction_nesting += 1;
371
0
}
372
373
void flush_odb_transaction(void)
374
0
{
375
0
  flush_batch_fsync();
376
0
  flush_bulk_checkin_packfile(&bulk_checkin_packfile);
377
0
}
378
379
void end_odb_transaction(void)
380
0
{
381
0
  odb_transaction_nesting -= 1;
382
0
  if (odb_transaction_nesting < 0)
383
0
    BUG("Unbalanced ODB transaction nesting");
384
385
0
  if (odb_transaction_nesting)
386
0
    return;
387
388
0
  flush_odb_transaction();
389
0
}