Coverage Report

Created: 2026-02-14 06:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/perfetto/src/profiling/memory/heapprofd_producer.cc
Line
Count
Source
1
/*
2
 * Copyright (C) 2018 The Android Open Source Project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include "src/profiling/memory/heapprofd_producer.h"
18
19
#include <signal.h>
20
#include <sys/stat.h>
21
#include <sys/types.h>
22
#include <unistd.h>
23
24
#include <algorithm>
25
#include <cinttypes>
26
#include <functional>
27
#include <optional>
28
#include <string>
29
30
#include "perfetto/base/compiler.h"
31
#include "perfetto/base/logging.h"
32
#include "perfetto/ext/base/file_utils.h"
33
#include "perfetto/ext/base/string_splitter.h"
34
#include "perfetto/ext/base/string_utils.h"
35
#include "perfetto/ext/base/thread_task_runner.h"
36
#include "perfetto/ext/base/watchdog_posix.h"
37
#include "perfetto/ext/tracing/core/basic_types.h"
38
#include "perfetto/ext/tracing/core/trace_writer.h"
39
#include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
40
#include "perfetto/tracing/core/data_source_config.h"
41
#include "perfetto/tracing/core/data_source_descriptor.h"
42
#include "perfetto/tracing/core/forward_decls.h"
43
#include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
44
#include "src/profiling/common/producer_support.h"
45
#include "src/profiling/common/profiler_guardrails.h"
46
#include "src/profiling/memory/shared_ring_buffer.h"
47
#include "src/profiling/memory/unwound_messages.h"
48
#include "src/profiling/memory/wire_protocol.h"
49
50
#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
51
#include <sys/system_properties.h>
52
#endif
53
54
namespace perfetto {
55
namespace profiling {
56
namespace {
57
using ::perfetto::protos::pbzero::ProfilePacket;
58
59
constexpr char kHeapprofdDataSource[] = "android.heapprofd";
60
constexpr size_t kUnwinderThreads = 5;
61
62
constexpr uint32_t kInitialConnectionBackoffMs = 100;
63
constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
64
constexpr uint32_t kGuardrailIntervalMs = 30 * 1000;
65
66
constexpr uint64_t kDefaultShmemSize = 8 * 1048576;  // ~8 MB
67
constexpr uint64_t kMaxShmemSize = 500 * 1048576;    // ~500 MB
68
69
// Constants specified by bionic, hardcoded here for simplicity.
70
constexpr int kProfilingSignal = __SIGRTMIN + 4;
71
constexpr int kHeapprofdSignalValue = 0;
72
73
std::vector<UnwindingWorker> MakeUnwindingWorkers(HeapprofdProducer* delegate,
74
0
                                                  size_t n) {
75
0
  std::vector<UnwindingWorker> ret;
76
0
  for (size_t i = 0; i < n; ++i) {
77
0
    ret.emplace_back(delegate,
78
0
                     base::ThreadTaskRunner::CreateAndStart("heapprofdunwind"));
79
0
  }
80
0
  return ret;
81
0
}
82
83
bool ConfigTargetsProcess(const HeapprofdConfig& cfg,
84
                          const Process& proc,
85
0
                          const std::vector<std::string>& normalized_cmdlines) {
86
0
  if (cfg.all())
87
0
    return true;
88
89
0
  const auto& pids = cfg.pid();
90
0
  if (std::find(pids.cbegin(), pids.cend(), static_cast<uint64_t>(proc.pid)) !=
91
0
      pids.cend()) {
92
0
    return true;
93
0
  }
94
95
0
  if (std::find(normalized_cmdlines.cbegin(), normalized_cmdlines.cend(),
96
0
                proc.cmdline) != normalized_cmdlines.cend()) {
97
0
    return true;
98
0
  }
99
0
  return false;
100
0
}
101
102
0
bool IsFile(int fd, const char* fn) {
103
0
  struct stat fdstat;
104
0
  struct stat fnstat;
105
0
  if (fstat(fd, &fdstat) == -1) {
106
0
    PERFETTO_PLOG("fstat");
107
0
    return false;
108
0
  }
109
0
  if (lstat(fn, &fnstat) == -1) {
110
0
    PERFETTO_PLOG("lstat");
111
0
    return false;
112
0
  }
113
0
  return fdstat.st_ino == fnstat.st_ino;
114
0
}
115
116
protos::pbzero::ProfilePacket::ProcessHeapSamples::ClientError
117
0
ErrorStateToProto(SharedRingBuffer::ErrorState state) {
118
0
  switch (state) {
119
0
    case (SharedRingBuffer::kNoError):
120
0
      return protos::pbzero::ProfilePacket::ProcessHeapSamples::
121
0
          CLIENT_ERROR_NONE;
122
0
    case (SharedRingBuffer::kHitTimeout):
123
0
      return protos::pbzero::ProfilePacket::ProcessHeapSamples::
124
0
          CLIENT_ERROR_HIT_TIMEOUT;
125
0
    case (SharedRingBuffer::kInvalidStackBounds):
126
0
      return protos::pbzero::ProfilePacket::ProcessHeapSamples::
127
0
          CLIENT_ERROR_INVALID_STACK_BOUNDS;
128
0
  }
129
0
}
130
131
}  // namespace
132
133
bool HeapprofdConfigToClientConfiguration(
134
    const HeapprofdConfig& heapprofd_config,
135
0
    ClientConfiguration* cli_config) {
136
0
  cli_config->default_interval = heapprofd_config.sampling_interval_bytes();
137
0
  cli_config->block_client = heapprofd_config.block_client();
138
0
  cli_config->disable_fork_teardown = heapprofd_config.disable_fork_teardown();
139
0
  cli_config->disable_vfork_detection =
140
0
      heapprofd_config.disable_vfork_detection();
141
0
  cli_config->block_client_timeout_us =
142
0
      heapprofd_config.block_client_timeout_us();
143
0
  cli_config->all_heaps = heapprofd_config.all_heaps();
144
0
  cli_config->adaptive_sampling_shmem_threshold =
145
0
      heapprofd_config.adaptive_sampling_shmem_threshold();
146
0
  cli_config->adaptive_sampling_max_sampling_interval_bytes =
147
0
      heapprofd_config.adaptive_sampling_max_sampling_interval_bytes();
148
0
  size_t n = 0;
149
0
  const std::vector<std::string>& exclude_heaps =
150
0
      heapprofd_config.exclude_heaps();
151
  // heaps[i] and heaps_interval[i] represent that the heap named in heaps[i]
152
  // should be sampled with sampling interval of heap_interval[i].
153
0
  std::vector<std::string> heaps = heapprofd_config.heaps();
154
0
  std::vector<uint64_t> heap_intervals =
155
0
      heapprofd_config.heap_sampling_intervals();
156
0
  if (heaps.empty() && !cli_config->all_heaps) {
157
0
    heaps.push_back("libc.malloc");
158
0
  }
159
160
0
  if (heap_intervals.empty()) {
161
0
    heap_intervals.assign(heaps.size(),
162
0
                          heapprofd_config.sampling_interval_bytes());
163
0
  }
164
0
  if (heap_intervals.size() != heaps.size()) {
165
0
    PERFETTO_ELOG("heap_sampling_intervals and heaps length mismatch.");
166
0
    return false;
167
0
  }
168
0
  if (std::find(heap_intervals.begin(), heap_intervals.end(), 0u) !=
169
0
      heap_intervals.end()) {
170
0
    PERFETTO_ELOG("zero sampling interval.");
171
0
    return false;
172
0
  }
173
0
  if (!exclude_heaps.empty()) {
174
    // For disabled heaps, we add explicit entries but with sampling interval
175
    // 0. The consumer of the sampling intervals in ClientConfiguration,
176
    // GetSamplingInterval in wire_protocol.h, uses 0 to signal a heap is
177
    // disabled, either because it isn't enabled (all_heaps is not set, and the
178
    // heap isn't named), or because we explicitely set it here.
179
0
    heaps.insert(heaps.end(), exclude_heaps.cbegin(), exclude_heaps.cend());
180
0
    heap_intervals.insert(heap_intervals.end(), exclude_heaps.size(), 0u);
181
0
  }
182
0
  if (heaps.size() > base::ArraySize(cli_config->heaps)) {
183
0
    heaps.resize(base::ArraySize(cli_config->heaps));
184
0
    PERFETTO_ELOG("Too many heaps requested. Truncating.");
185
0
  }
186
0
  for (size_t i = 0; i < heaps.size(); ++i) {
187
0
    const std::string& heap = heaps[i];
188
0
    const uint64_t interval = heap_intervals[i];
189
    // -1 for the \0 byte.
190
0
    if (heap.size() > HEAPPROFD_HEAP_NAME_SZ - 1) {
191
0
      PERFETTO_ELOG("Invalid heap name %s (larger than %d)", heap.c_str(),
192
0
                    HEAPPROFD_HEAP_NAME_SZ - 1);
193
0
      continue;
194
0
    }
195
0
    base::StringCopy(&cli_config->heaps[n].name[0], heap.c_str(),
196
0
                     sizeof(cli_config->heaps[n].name));
197
0
    cli_config->heaps[n].interval = interval;
198
0
    n++;
199
0
  }
200
0
  cli_config->num_heaps = n;
201
0
  return true;
202
0
}
203
204
// We create kUnwinderThreads unwinding threads. Bookkeeping is done on the main
205
// thread.
206
HeapprofdProducer::HeapprofdProducer(HeapprofdMode mode,
207
                                     base::TaskRunner* task_runner,
208
                                     bool exit_when_done)
209
0
    : task_runner_(task_runner),
210
0
      mode_(mode),
211
0
      exit_when_done_(exit_when_done),
212
0
      socket_delegate_(this),
213
0
      weak_factory_(this),
214
0
      unwinding_workers_(MakeUnwindingWorkers(this, kUnwinderThreads)) {
215
0
  CheckDataSourceCpuTask();
216
0
  CheckDataSourceMemoryTask();
217
0
}
218
219
0
HeapprofdProducer::~HeapprofdProducer() = default;
220
221
void HeapprofdProducer::SetTargetProcess(pid_t target_pid,
222
0
                                         std::string target_cmdline) {
223
0
  target_process_.pid = target_pid;
224
0
  target_process_.cmdline = target_cmdline;
225
0
}
226
227
0
void HeapprofdProducer::SetDataSourceCallback(std::function<void()> fn) {
228
0
  data_source_callback_ = fn;
229
0
}
230
231
0
void HeapprofdProducer::AdoptSocket(base::ScopedFile fd) {
232
0
  PERFETTO_DCHECK(mode_ == HeapprofdMode::kChild);
233
0
  auto socket = base::UnixSocket::AdoptConnected(
234
0
      std::move(fd), &socket_delegate_, task_runner_, base::SockFamily::kUnix,
235
0
      base::SockType::kStream);
236
237
0
  HandleClientConnection(std::move(socket), target_process_);
238
0
}
239
240
0
void HeapprofdProducer::OnConnect() {
241
0
  PERFETTO_DCHECK(state_ == kConnecting);
242
0
  state_ = kConnected;
243
0
  ResetConnectionBackoff();
244
0
  PERFETTO_LOG("Connected to the service, mode [%s].",
245
0
               mode_ == HeapprofdMode::kCentral ? "central" : "child");
246
247
0
  DataSourceDescriptor desc;
248
0
  desc.set_name(kHeapprofdDataSource);
249
0
  desc.set_will_notify_on_stop(true);
250
0
  endpoint_->RegisterDataSource(desc);
251
0
}
252
253
0
void HeapprofdProducer::OnDisconnect() {
254
0
  PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
255
0
  PERFETTO_LOG("Disconnected from tracing service");
256
257
  // Do not attempt to reconnect if we're a process-private process, just quit.
258
0
  if (exit_when_done_) {
259
0
    TerminateProcess(/*exit_status=*/1);  // does not return
260
0
  }
261
262
  // Central mode - attempt to reconnect.
263
0
  auto weak_producer = weak_factory_.GetWeakPtr();
264
0
  if (state_ == kConnected)
265
0
    return task_runner_->PostTask([weak_producer] {
266
0
      if (!weak_producer)
267
0
        return;
268
0
      weak_producer->Restart();
269
0
    });
270
271
0
  state_ = kNotConnected;
272
0
  IncreaseConnectionBackoff();
273
0
  task_runner_->PostDelayedTask(
274
0
      [weak_producer] {
275
0
        if (!weak_producer)
276
0
          return;
277
0
        weak_producer->ConnectService();
278
0
      },
279
0
      connection_backoff_ms_);
280
0
}
281
282
0
void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
283
0
  PERFETTO_DCHECK(state_ == kNotStarted);
284
0
  state_ = kNotConnected;
285
286
0
  ResetConnectionBackoff();
287
0
  producer_sock_name_ = socket_name;
288
0
  ConnectService();
289
0
}
290
291
0
void HeapprofdProducer::ConnectService() {
292
0
  SetProducerEndpoint(ProducerIPCClient::Connect(
293
0
      producer_sock_name_, this, "android.heapprofd", task_runner_));
294
0
}
295
296
void HeapprofdProducer::SetProducerEndpoint(
297
0
    std::unique_ptr<TracingService::ProducerEndpoint> endpoint) {
298
0
  PERFETTO_DCHECK(state_ == kNotConnected || state_ == kNotStarted);
299
0
  state_ = kConnecting;
300
0
  endpoint_ = std::move(endpoint);
301
0
}
302
303
0
void HeapprofdProducer::IncreaseConnectionBackoff() {
304
0
  connection_backoff_ms_ *= 2;
305
0
  if (connection_backoff_ms_ > kMaxConnectionBackoffMs)
306
0
    connection_backoff_ms_ = kMaxConnectionBackoffMs;
307
0
}
308
309
0
void HeapprofdProducer::ResetConnectionBackoff() {
310
0
  connection_backoff_ms_ = kInitialConnectionBackoffMs;
311
0
}
312
313
0
void HeapprofdProducer::Restart() {
314
  // We lost the connection with the tracing service. At this point we need
315
  // to reset all the data sources. Trying to handle that manually is going to
316
  // be error prone. What we do here is simply destroy the instance and
317
  // recreate it again.
318
319
  // Oneshot producer should not attempt restarts.
320
0
  if (exit_when_done_)
321
0
    PERFETTO_FATAL("Attempting to restart a one shot producer.");
322
323
0
  HeapprofdMode mode = mode_;
324
0
  base::TaskRunner* task_runner = task_runner_;
325
0
  const char* socket_name = producer_sock_name_;
326
0
  const bool exit_when_done = exit_when_done_;
327
328
  // Invoke destructor and then the constructor again.
329
0
  this->~HeapprofdProducer();
330
0
  new (this) HeapprofdProducer(mode, task_runner, exit_when_done);
331
332
0
  ConnectWithRetries(socket_name);
333
0
}
334
335
// TODO(rsavitski): would be cleaner to shut down the event loop instead
336
// (letting main exit). One test-friendly approach is to supply a shutdown
337
// callback in the constructor.
338
__attribute__((noreturn)) void HeapprofdProducer::TerminateProcess(
339
0
    int exit_status) {
340
0
  PERFETTO_CHECK(mode_ == HeapprofdMode::kChild);
341
0
  PERFETTO_LOG("Shutting down child heapprofd (status %d).", exit_status);
342
0
  exit(exit_status);
343
0
}
344
345
0
void HeapprofdProducer::OnTracingSetup() {}
346
347
void HeapprofdProducer::WriteRejectedConcurrentSession(BufferID buffer_id,
348
0
                                                       pid_t pid) {
349
0
  auto trace_writer = endpoint_->CreateTraceWriter(buffer_id);
350
0
  auto trace_packet = trace_writer->NewTracePacket();
351
0
  trace_packet->set_timestamp(
352
0
      static_cast<uint64_t>(base::GetBootTimeNs().count()));
353
0
  auto profile_packet = trace_packet->set_profile_packet();
354
0
  auto process_dump = profile_packet->add_process_dumps();
355
0
  process_dump->set_pid(static_cast<uint64_t>(pid));
356
0
  process_dump->set_rejected_concurrent(true);
357
0
  trace_packet->Finalize();
358
0
  trace_writer->Flush();
359
0
}
360
361
void HeapprofdProducer::SetupDataSource(DataSourceInstanceID id,
362
0
                                        const DataSourceConfig& ds_config) {
363
0
  if (ds_config.session_initiator() ==
364
0
      DataSourceConfig::SESSION_INITIATOR_TRUSTED_SYSTEM) {
365
0
    PERFETTO_LOG("Setting up datasource: statsd initiator.");
366
0
  } else {
367
0
    PERFETTO_LOG("Setting up datasource: non-statsd initiator.");
368
0
  }
369
0
  if (mode_ == HeapprofdMode::kChild && ds_config.enable_extra_guardrails()) {
370
0
    PERFETTO_ELOG("enable_extra_guardrails is not supported on user.");
371
0
    return;
372
0
  }
373
374
0
  HeapprofdConfig heapprofd_config;
375
0
  heapprofd_config.ParseFromString(ds_config.heapprofd_config_raw());
376
377
0
  if (heapprofd_config.all() && !heapprofd_config.pid().empty())
378
0
    PERFETTO_ELOG("No point setting all and pid");
379
0
  if (heapprofd_config.all() && !heapprofd_config.process_cmdline().empty())
380
0
    PERFETTO_ELOG("No point setting all and process_cmdline");
381
382
0
  if (ds_config.name() != kHeapprofdDataSource) {
383
0
    PERFETTO_DLOG("Invalid data source name.");
384
0
    return;
385
0
  }
386
387
0
  if (data_sources_.find(id) != data_sources_.end()) {
388
0
    PERFETTO_ELOG("Received duplicated data source instance id: %" PRIu64, id);
389
0
    return;
390
0
  }
391
392
0
  std::optional<std::vector<std::string>> normalized_cmdlines =
393
0
      NormalizeCmdlines(heapprofd_config.process_cmdline());
394
0
  if (!normalized_cmdlines.has_value()) {
395
0
    PERFETTO_ELOG("Rejecting data source due to invalid cmdline in config.");
396
0
    return;
397
0
  }
398
399
  // Child mode is only interested in the first data source matching the
400
  // already-connected process.
401
0
  if (mode_ == HeapprofdMode::kChild) {
402
0
    if (!ConfigTargetsProcess(heapprofd_config, target_process_,
403
0
                              normalized_cmdlines.value())) {
404
0
      PERFETTO_DLOG("Child mode skipping setup of unrelated data source.");
405
0
      return;
406
0
    }
407
408
0
    if (!data_sources_.empty()) {
409
0
      PERFETTO_LOG("Child mode skipping concurrent data source.");
410
411
      // Manually write one ProfilePacket about the rejected session.
412
0
      auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
413
0
      WriteRejectedConcurrentSession(buffer_id, target_process_.pid);
414
0
      return;
415
0
    }
416
0
  }
417
418
0
  std::optional<uint64_t> start_cputime_sec;
419
0
  if (heapprofd_config.max_heapprofd_cpu_secs() > 0) {
420
0
    start_cputime_sec = GetCputimeSecForCurrentProcess();
421
422
0
    if (!start_cputime_sec) {
423
0
      PERFETTO_ELOG("Failed to enforce CPU guardrail. Rejecting config.");
424
0
      return;
425
0
    }
426
0
  }
427
428
0
  auto buffer_id = static_cast<BufferID>(ds_config.target_buffer());
429
0
  DataSource data_source(endpoint_->CreateTraceWriter(buffer_id));
430
0
  data_source.id = id;
431
0
  auto& cli_config = data_source.client_configuration;
432
0
  if (!HeapprofdConfigToClientConfiguration(heapprofd_config, &cli_config))
433
0
    return;
434
0
  data_source.config = heapprofd_config;
435
0
  data_source.ds_config = ds_config;
436
0
  data_source.normalized_cmdlines = std::move(normalized_cmdlines.value());
437
0
  data_source.stop_timeout_ms = ds_config.stop_timeout_ms()
438
0
                                    ? ds_config.stop_timeout_ms()
439
0
                                    : 5000 /* kDataSourceStopTimeoutMs */;
440
0
  data_source.guardrail_config.cpu_start_secs = start_cputime_sec;
441
0
  data_source.guardrail_config.memory_guardrail_kb =
442
0
      heapprofd_config.max_heapprofd_memory_kb();
443
0
  data_source.guardrail_config.cpu_guardrail_sec =
444
0
      heapprofd_config.max_heapprofd_cpu_secs();
445
446
0
  InterningOutputTracker::WriteFixedInterningsPacket(
447
0
      data_source.trace_writer.get(),
448
0
      protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
449
0
  data_sources_.emplace(id, std::move(data_source));
450
0
  PERFETTO_DLOG("Set up data source.");
451
452
0
  if (mode_ == HeapprofdMode::kChild && data_source_callback_)
453
0
    (*data_source_callback_)();
454
0
}
455
456
0
bool HeapprofdProducer::IsPidProfiled(pid_t pid) {
457
0
  return std::any_of(
458
0
      data_sources_.cbegin(), data_sources_.cend(),
459
0
      [pid](const std::pair<const DataSourceInstanceID, DataSource>& p) {
460
0
        const DataSource& ds = p.second;
461
0
        return ds.process_states.count(pid) > 0;
462
0
      });
463
0
}
464
465
0
void HeapprofdProducer::SetStartupProperties(DataSource* data_source) {
466
0
  const HeapprofdConfig& heapprofd_config = data_source->config;
467
0
  if (heapprofd_config.all())
468
0
    data_source->properties.emplace_back(properties_.SetAll());
469
470
0
  for (std::string cmdline : data_source->normalized_cmdlines)
471
0
    data_source->properties.emplace_back(
472
0
        properties_.SetProperty(std::move(cmdline)));
473
0
}
474
475
0
void HeapprofdProducer::SignalRunningProcesses(DataSource* data_source) {
476
0
  const HeapprofdConfig& heapprofd_config = data_source->config;
477
478
0
  std::set<pid_t> pids;
479
0
  if (heapprofd_config.all())
480
0
    FindAllProfilablePids(&pids);
481
0
  for (uint64_t pid : heapprofd_config.pid())
482
0
    pids.emplace(static_cast<pid_t>(pid));
483
484
0
  if (!data_source->normalized_cmdlines.empty())
485
0
    FindPidsForCmdlines(data_source->normalized_cmdlines, &pids);
486
487
0
  if (heapprofd_config.min_anonymous_memory_kb() > 0)
488
0
    RemoveUnderAnonThreshold(heapprofd_config.min_anonymous_memory_kb(), &pids);
489
490
0
  for (auto pid_it = pids.cbegin(); pid_it != pids.cend();) {
491
0
    pid_t pid = *pid_it;
492
0
    if (IsPidProfiled(pid)) {
493
0
      PERFETTO_LOG("Rejecting concurrent session for %" PRIdMAX,
494
0
                   static_cast<intmax_t>(pid));
495
0
      data_source->rejected_pids.emplace(pid);
496
0
      pid_it = pids.erase(pid_it);
497
0
      continue;
498
0
    }
499
500
0
    PERFETTO_DLOG("Sending signal: %d (si_value: %d) to pid: %d",
501
0
                  kProfilingSignal, kHeapprofdSignalValue, pid);
502
0
    union sigval signal_value;
503
0
    signal_value.sival_int = kHeapprofdSignalValue;
504
0
    if (sigqueue(pid, kProfilingSignal, signal_value) != 0) {
505
0
      PERFETTO_DPLOG("sigqueue");
506
0
    }
507
0
    ++pid_it;
508
0
  }
509
0
  data_source->signaled_pids = std::move(pids);
510
0
}
511
512
void HeapprofdProducer::StartDataSource(DataSourceInstanceID id,
513
0
                                        const DataSourceConfig&) {
514
0
  PERFETTO_DLOG("Starting data source %" PRIu64, id);
515
516
0
  auto it = data_sources_.find(id);
517
0
  if (it == data_sources_.end()) {
518
    // This is expected in child heapprofd, where we reject uninteresting data
519
    // sources in SetupDataSource.
520
0
    if (mode_ == HeapprofdMode::kCentral) {
521
0
      PERFETTO_ELOG("Received invalid data source instance to start: %" PRIu64,
522
0
                    id);
523
0
    }
524
0
    return;
525
0
  }
526
527
0
  DataSource& data_source = it->second;
528
0
  if (data_source.started) {
529
0
    PERFETTO_ELOG("Trying to start already started data-source: %" PRIu64, id);
530
0
    return;
531
0
  }
532
0
  const HeapprofdConfig& heapprofd_config = data_source.config;
533
534
  // Central daemon - set system properties for any targets that start later,
535
  // and signal already-running targets to start the profiling client.
536
0
  if (mode_ == HeapprofdMode::kCentral) {
537
0
    if (!heapprofd_config.no_startup())
538
0
      SetStartupProperties(&data_source);
539
0
    if (!heapprofd_config.no_running())
540
0
      SignalRunningProcesses(&data_source);
541
0
  }
542
543
0
  const auto continuous_dump_config = heapprofd_config.continuous_dump_config();
544
0
  uint32_t dump_interval = continuous_dump_config.dump_interval_ms();
545
0
  if (dump_interval) {
546
0
    data_source.dump_interval_ms = dump_interval;
547
0
    auto weak_producer = weak_factory_.GetWeakPtr();
548
0
    task_runner_->PostDelayedTask(
549
0
        [weak_producer, id] {
550
0
          if (!weak_producer)
551
0
            return;
552
0
          weak_producer->DoDrainAndContinuousDump(id);
553
0
        },
554
0
        continuous_dump_config.dump_phase_ms());
555
0
  }
556
0
  data_source.started = true;
557
0
  PERFETTO_DLOG("Started DataSource");
558
0
}
559
560
0
UnwindingWorker& HeapprofdProducer::UnwinderForPID(pid_t pid) {
561
0
  return unwinding_workers_[static_cast<uint64_t>(pid) % kUnwinderThreads];
562
0
}
563
564
0
void HeapprofdProducer::StopDataSource(DataSourceInstanceID id) {
565
0
  auto it = data_sources_.find(id);
566
0
  if (it == data_sources_.end()) {
567
0
    endpoint_->NotifyDataSourceStopped(id);
568
0
    if (mode_ == HeapprofdMode::kCentral)
569
0
      PERFETTO_ELOG("Trying to stop non existing data source: %" PRIu64, id);
570
0
    return;
571
0
  }
572
573
0
  PERFETTO_LOG("Stopping data source %" PRIu64, id);
574
575
0
  DataSource& data_source = it->second;
576
0
  data_source.was_stopped = true;
577
0
  ShutdownDataSource(&data_source);
578
0
}
579
580
0
void HeapprofdProducer::ShutdownDataSource(DataSource* data_source) {
581
0
  data_source->shutting_down = true;
582
  // If no processes connected, or all of them have already disconnected
583
  // (and have been dumped) and no PIDs have been rejected,
584
  // MaybeFinishDataSource can tear down the data source.
585
0
  if (MaybeFinishDataSource(data_source))
586
0
    return;
587
588
0
  if (!data_source->rejected_pids.empty()) {
589
0
    auto trace_packet = data_source->trace_writer->NewTracePacket();
590
0
    ProfilePacket* profile_packet = trace_packet->set_profile_packet();
591
0
    for (pid_t rejected_pid : data_source->rejected_pids) {
592
0
      ProfilePacket::ProcessHeapSamples* proto =
593
0
          profile_packet->add_process_dumps();
594
0
      proto->set_pid(static_cast<uint64_t>(rejected_pid));
595
0
      proto->set_rejected_concurrent(true);
596
0
    }
597
0
    trace_packet->Finalize();
598
0
    data_source->rejected_pids.clear();
599
0
    if (MaybeFinishDataSource(data_source))
600
0
      return;
601
0
  }
602
603
0
  for (const auto& pid_and_process_state : data_source->process_states) {
604
0
    pid_t pid = pid_and_process_state.first;
605
0
    UnwinderForPID(pid).PostDisconnectSocket(pid);
606
0
  }
607
608
0
  auto id = data_source->id;
609
0
  auto weak_producer = weak_factory_.GetWeakPtr();
610
0
  task_runner_->PostDelayedTask(
611
0
      [weak_producer, id] {
612
0
        if (!weak_producer)
613
0
          return;
614
0
        auto ds_it = weak_producer->data_sources_.find(id);
615
0
        if (ds_it != weak_producer->data_sources_.end()) {
616
0
          PERFETTO_ELOG("Final dump timed out.");
617
0
          DataSource& ds = ds_it->second;
618
619
0
          for (const auto& pid_and_process_state : ds.process_states) {
620
0
            pid_t pid = pid_and_process_state.first;
621
0
            weak_producer->UnwinderForPID(pid).PostPurgeProcess(pid);
622
0
          }
623
          // Do not dump any stragglers, just trigger the Flush and tear down
624
          // the data source.
625
0
          ds.process_states.clear();
626
0
          ds.rejected_pids.clear();
627
0
          PERFETTO_CHECK(weak_producer->MaybeFinishDataSource(&ds));
628
0
        }
629
0
      },
630
0
      data_source->stop_timeout_ms);
631
0
}
632
633
0
void HeapprofdProducer::DoDrainAndContinuousDump(DataSourceInstanceID id) {
634
0
  auto it = data_sources_.find(id);
635
0
  if (it == data_sources_.end())
636
0
    return;
637
0
  DataSource& data_source = it->second;
638
0
  PERFETTO_DCHECK(data_source.pending_free_drains == 0);
639
640
0
  for (auto& [pid, process_state] : data_source.process_states) {
641
0
    UnwinderForPID(pid).PostDrainFree(data_source.id, pid);
642
0
    data_source.pending_free_drains++;
643
0
  }
644
645
  // In case there are no pending free drains, dump immediately.
646
0
  DoContinuousDump(&data_source);
647
0
}
648
649
0
void HeapprofdProducer::DoContinuousDump(DataSource* ds) {
650
0
  if (ds->pending_free_drains != 0) {
651
0
    return;
652
0
  }
653
654
0
  DumpProcessesInDataSource(ds);
655
0
  auto id = ds->id;
656
0
  auto weak_producer = weak_factory_.GetWeakPtr();
657
0
  task_runner_->PostDelayedTask(
658
0
      [weak_producer, id] {
659
0
        if (!weak_producer)
660
0
          return;
661
0
        weak_producer->DoDrainAndContinuousDump(id);
662
0
      },
663
0
      ds->dump_interval_ms);
664
0
}
665
666
void HeapprofdProducer::PostDrainDone(UnwindingWorker*,
667
0
                                      DataSourceInstanceID ds_id) {
668
0
  auto weak_this = weak_factory_.GetWeakPtr();
669
0
  task_runner_->PostTask([weak_this, ds_id] {
670
0
    if (weak_this)
671
0
      weak_this->DrainDone(ds_id);
672
0
  });
673
0
}
674
675
0
void HeapprofdProducer::DrainDone(DataSourceInstanceID ds_id) {
676
0
  auto it = data_sources_.find(ds_id);
677
0
  if (it == data_sources_.end()) {
678
0
    return;
679
0
  }
680
0
  DataSource& data_source = it->second;
681
0
  data_source.pending_free_drains--;
682
0
  DoContinuousDump(&data_source);
683
0
}
684
685
// static
686
void HeapprofdProducer::SetStats(
687
    protos::pbzero::ProfilePacket::ProcessStats* stats,
688
0
    const ProcessState& process_state) {
689
0
  stats->set_unwinding_errors(process_state.unwinding_errors);
690
0
  stats->set_heap_samples(process_state.heap_samples);
691
0
  stats->set_map_reparses(process_state.map_reparses);
692
0
  stats->set_total_unwinding_time_us(process_state.total_unwinding_time_us);
693
0
  stats->set_client_spinlock_blocked_us(
694
0
      process_state.client_spinlock_blocked_us);
695
0
  auto* unwinding_hist = stats->set_unwinding_time_us();
696
0
  for (const auto& p : process_state.unwinding_time_us.GetData()) {
697
0
    auto* bucket = unwinding_hist->add_buckets();
698
0
    if (p.first == LogHistogram::kMaxBucket)
699
0
      bucket->set_max_bucket(true);
700
0
    else
701
0
      bucket->set_upper_limit(p.first);
702
0
    bucket->set_count(p.second);
703
0
  }
704
0
}
705
706
void HeapprofdProducer::DumpProcessState(DataSource* data_source,
707
                                         pid_t pid,
708
0
                                         ProcessState* process_state) {
709
0
  for (auto& heap_id_and_heap_info : process_state->heap_infos) {
710
0
    ProcessState::HeapInfo& heap_info = heap_id_and_heap_info.second;
711
712
0
    bool from_startup = data_source->signaled_pids.find(pid) ==
713
0
                        data_source->signaled_pids.cend();
714
715
0
    auto new_heapsamples = [pid, from_startup, process_state, data_source,
716
0
                            &heap_info](
717
0
                               ProfilePacket::ProcessHeapSamples* proto) {
718
0
      proto->set_pid(static_cast<uint64_t>(pid));
719
0
      proto->set_timestamp(heap_info.heap_tracker.dump_timestamp());
720
0
      proto->set_from_startup(from_startup);
721
0
      proto->set_disconnected(process_state->disconnected);
722
0
      proto->set_buffer_overran(process_state->error_state ==
723
0
                                SharedRingBuffer::kHitTimeout);
724
0
      proto->set_client_error(ErrorStateToProto(process_state->error_state));
725
0
      proto->set_buffer_corrupted(process_state->buffer_corrupted);
726
0
      proto->set_hit_guardrail(data_source->hit_guardrail);
727
0
      if (!heap_info.heap_name.empty())
728
0
        proto->set_heap_name(heap_info.heap_name.c_str());
729
0
      proto->set_sampling_interval_bytes(heap_info.sampling_interval);
730
0
      proto->set_orig_sampling_interval_bytes(heap_info.orig_sampling_interval);
731
0
      auto* stats = proto->set_stats();
732
0
      SetStats(stats, *process_state);
733
0
    };
734
735
0
    DumpState dump_state(data_source->trace_writer.get(),
736
0
                         std::move(new_heapsamples),
737
0
                         &data_source->intern_state);
738
739
0
    heap_info.heap_tracker.GetCallstackAllocations(
740
0
        [&dump_state,
741
0
         &data_source](const HeapTracker::CallstackAllocations& alloc) {
742
0
          dump_state.WriteAllocation(alloc, data_source->config.dump_at_max());
743
0
        });
744
0
    dump_state.DumpCallstacks(&callsites_);
745
0
  }
746
0
}
747
748
0
void HeapprofdProducer::DumpProcessesInDataSource(DataSource* ds) {
749
0
  for (std::pair<const pid_t, ProcessState>& pid_and_process_state :
750
0
       ds->process_states) {
751
0
    pid_t pid = pid_and_process_state.first;
752
0
    ProcessState& process_state = pid_and_process_state.second;
753
0
    DumpProcessState(ds, pid, &process_state);
754
0
  }
755
0
}
756
757
0
void HeapprofdProducer::DumpAll() {
758
0
  PERFETTO_LOG("Received signal. Dumping all data sources.");
759
0
  for (auto& id_and_data_source : data_sources_)
760
0
    DumpProcessesInDataSource(&id_and_data_source.second);
761
0
}
762
763
void HeapprofdProducer::Flush(FlushRequestID flush_id,
764
                              const DataSourceInstanceID* ids,
765
                              size_t num_ids,
766
0
                              FlushFlags) {
767
0
  size_t& flush_in_progress = flushes_in_progress_[flush_id];
768
0
  PERFETTO_DCHECK(flush_in_progress == 0);
769
0
  flush_in_progress = num_ids;
770
0
  for (size_t i = 0; i < num_ids; ++i) {
771
0
    auto it = data_sources_.find(ids[i]);
772
0
    if (it == data_sources_.end()) {
773
0
      PERFETTO_ELOG("Trying to flush unknown data-source %" PRIu64, ids[i]);
774
0
      flush_in_progress--;
775
0
      continue;
776
0
    }
777
0
    DataSource& data_source = it->second;
778
0
    auto weak_producer = weak_factory_.GetWeakPtr();
779
780
0
    auto callback = [weak_producer, flush_id] {
781
0
      if (weak_producer)
782
        // Reposting because this task runner could be on a different thread
783
        // than the IPC task runner.
784
0
        return weak_producer->task_runner_->PostTask([weak_producer, flush_id] {
785
0
          if (weak_producer)
786
0
            return weak_producer->FinishDataSourceFlush(flush_id);
787
0
        });
788
0
    };
789
0
    data_source.trace_writer->Flush(std::move(callback));
790
0
  }
791
0
  if (flush_in_progress == 0) {
792
0
    endpoint_->NotifyFlushComplete(flush_id);
793
0
    flushes_in_progress_.erase(flush_id);
794
0
  }
795
0
}
796
797
0
void HeapprofdProducer::FinishDataSourceFlush(FlushRequestID flush_id) {
798
0
  auto it = flushes_in_progress_.find(flush_id);
799
0
  if (it == flushes_in_progress_.end()) {
800
0
    PERFETTO_ELOG("FinishDataSourceFlush id invalid: %" PRIu64, flush_id);
801
0
    return;
802
0
  }
803
0
  size_t& flush_in_progress = it->second;
804
0
  if (--flush_in_progress == 0) {
805
0
    endpoint_->NotifyFlushComplete(flush_id);
806
0
    flushes_in_progress_.erase(flush_id);
807
0
  }
808
0
}
809
810
0
void HeapprofdProducer::SocketDelegate::OnDisconnect(base::UnixSocket* self) {
811
0
  auto it = producer_->pending_processes_.find(self->peer_pid_linux());
812
0
  if (it == producer_->pending_processes_.end()) {
813
0
    PERFETTO_ELOG("Unexpected disconnect.");
814
0
    return;
815
0
  }
816
817
0
  if (self == it->second.sock.get())
818
0
    producer_->pending_processes_.erase(it);
819
0
}
820
821
void HeapprofdProducer::SocketDelegate::OnNewIncomingConnection(
822
    base::UnixSocket*,
823
0
    std::unique_ptr<base::UnixSocket> new_connection) {
824
0
  Process peer_process;
825
0
  peer_process.pid = new_connection->peer_pid_linux();
826
0
  if (!GetCmdlineForPID(peer_process.pid, &peer_process.cmdline))
827
0
    PERFETTO_PLOG("Failed to get cmdline for %d", peer_process.pid);
828
829
0
  producer_->HandleClientConnection(std::move(new_connection), peer_process);
830
0
}
831
832
void HeapprofdProducer::SocketDelegate::OnDataAvailable(
833
0
    base::UnixSocket* self) {
834
0
  auto it = producer_->pending_processes_.find(self->peer_pid_linux());
835
0
  if (it == producer_->pending_processes_.end()) {
836
0
    PERFETTO_ELOG("Unexpected data.");
837
0
    return;
838
0
  }
839
840
0
  PendingProcess& pending_process = it->second;
841
842
0
  base::ScopedFile fds[kHandshakeSize];
843
0
  char buf[1];
844
0
  self->Receive(buf, sizeof(buf), fds, base::ArraySize(fds));
845
846
0
  static_assert(kHandshakeSize == 2, "change if and else if below.");
847
0
  if (fds[kHandshakeMaps] && fds[kHandshakeMem]) {
848
0
    auto ds_it =
849
0
        producer_->data_sources_.find(pending_process.data_source_instance_id);
850
0
    if (ds_it == producer_->data_sources_.end()) {
851
0
      producer_->pending_processes_.erase(it);
852
0
      return;
853
0
    }
854
0
    DataSource& data_source = ds_it->second;
855
856
0
    if (data_source.shutting_down) {
857
0
      producer_->pending_processes_.erase(it);
858
0
      PERFETTO_LOG("Got handshake for DS that is shutting down. Rejecting.");
859
0
      return;
860
0
    }
861
862
0
    std::string maps_file =
863
0
        "/proc/" + std::to_string(self->peer_pid_linux()) + "/maps";
864
0
    if (!IsFile(*fds[kHandshakeMaps], maps_file.c_str())) {
865
0
      producer_->pending_processes_.erase(it);
866
0
      PERFETTO_ELOG("Received invalid maps FD.");
867
0
      return;
868
0
    }
869
870
0
    std::string mem_file =
871
0
        "/proc/" + std::to_string(self->peer_pid_linux()) + "/mem";
872
0
    if (!IsFile(*fds[kHandshakeMem], mem_file.c_str())) {
873
0
      producer_->pending_processes_.erase(it);
874
0
      PERFETTO_ELOG("Received invalid mem FD.");
875
0
      return;
876
0
    }
877
878
0
    data_source.process_states.emplace(
879
0
        std::piecewise_construct, std::forward_as_tuple(self->peer_pid_linux()),
880
0
        std::forward_as_tuple(&producer_->callsites_,
881
0
                              data_source.config.dump_at_max()));
882
883
0
    PERFETTO_DLOG("%d: Received FDs.", self->peer_pid_linux());
884
0
    int raw_fd = pending_process.shmem.fd();
885
    // TODO(fmayer): Full buffer could deadlock us here.
886
0
    if (!self->Send(&data_source.client_configuration,
887
0
                    sizeof(data_source.client_configuration), &raw_fd, 1)) {
888
      // If Send fails, the socket will have been Shutdown, and the raw socket
889
      // closed.
890
0
      producer_->pending_processes_.erase(it);
891
0
      return;
892
0
    }
893
894
0
    UnwindingWorker::HandoffData handoff_data;
895
0
    handoff_data.data_source_instance_id =
896
0
        pending_process.data_source_instance_id;
897
0
    handoff_data.sock = self->ReleaseSocket();
898
0
    handoff_data.maps_fd = std::move(fds[kHandshakeMaps]);
899
0
    handoff_data.mem_fd = std::move(fds[kHandshakeMem]);
900
0
    handoff_data.shmem = std::move(pending_process.shmem);
901
0
    handoff_data.client_config = data_source.client_configuration;
902
0
    handoff_data.stream_allocations = data_source.config.stream_allocations();
903
904
0
    producer_->UnwinderForPID(self->peer_pid_linux())
905
0
        .PostHandoffSocket(std::move(handoff_data));
906
0
    producer_->pending_processes_.erase(it);
907
0
  } else if (fds[kHandshakeMaps] || fds[kHandshakeMem]) {
908
0
    PERFETTO_ELOG("%d: Received partial FDs.", self->peer_pid_linux());
909
0
    producer_->pending_processes_.erase(it);
910
0
  } else {
911
0
    PERFETTO_ELOG("%d: Received no FDs.", self->peer_pid_linux());
912
0
  }
913
0
}
914
915
HeapprofdProducer::DataSource* HeapprofdProducer::GetDataSourceForProcess(
916
0
    const Process& proc) {
917
0
  for (auto& ds_id_and_datasource : data_sources_) {
918
0
    DataSource& ds = ds_id_and_datasource.second;
919
0
    if (ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
920
0
      return &ds;
921
0
  }
922
0
  return nullptr;
923
0
}
924
925
void HeapprofdProducer::RecordOtherSourcesAsRejected(DataSource* active_ds,
926
0
                                                     const Process& proc) {
927
0
  for (auto& ds_id_and_datasource : data_sources_) {
928
0
    DataSource& ds = ds_id_and_datasource.second;
929
0
    if (&ds != active_ds &&
930
0
        ConfigTargetsProcess(ds.config, proc, ds.normalized_cmdlines))
931
0
      ds.rejected_pids.emplace(proc.pid);
932
0
  }
933
0
}
934
935
void HeapprofdProducer::HandleClientConnection(
936
    std::unique_ptr<base::UnixSocket> new_connection,
937
0
    Process process) {
938
0
  DataSource* data_source = GetDataSourceForProcess(process);
939
0
  if (!data_source) {
940
0
    PERFETTO_LOG("No data source found.");
941
0
    return;
942
0
  }
943
0
  RecordOtherSourcesAsRejected(data_source, process);
944
945
  // In fork mode, right now we check whether the target is not profileable
946
  // in the client, because we cannot read packages.list there.
947
0
  if (mode_ == HeapprofdMode::kCentral &&
948
0
      !CanProfile(data_source->ds_config, new_connection->peer_uid_posix(),
949
0
                  data_source->config.target_installed_by())) {
950
0
    PERFETTO_ELOG("%d (%s) is not profileable.", process.pid,
951
0
                  process.cmdline.c_str());
952
0
    return;
953
0
  }
954
955
0
  uint64_t shmem_size = data_source->config.shmem_size_bytes();
956
0
  if (!shmem_size)
957
0
    shmem_size = kDefaultShmemSize;
958
0
  if (shmem_size > kMaxShmemSize) {
959
0
    PERFETTO_LOG("Specified shared memory size of %" PRIu64
960
0
                 " exceeds maximum size of %" PRIu64 ". Reducing.",
961
0
                 shmem_size, kMaxShmemSize);
962
0
    shmem_size = kMaxShmemSize;
963
0
  }
964
965
0
  auto shmem = SharedRingBuffer::Create(static_cast<size_t>(shmem_size));
966
0
  if (!shmem || !shmem->is_valid()) {
967
0
    PERFETTO_LOG("Failed to create shared memory.");
968
0
    return;
969
0
  }
970
971
0
  pid_t peer_pid = new_connection->peer_pid_linux();
972
0
  if (peer_pid != process.pid) {
973
0
    PERFETTO_ELOG("Invalid PID connected.");
974
0
    return;
975
0
  }
976
977
0
  PendingProcess pending_process;
978
0
  pending_process.sock = std::move(new_connection);
979
0
  pending_process.data_source_instance_id = data_source->id;
980
0
  pending_process.shmem = std::move(*shmem);
981
0
  pending_processes_.emplace(peer_pid, std::move(pending_process));
982
0
}
983
984
void HeapprofdProducer::PostAllocRecord(
985
    UnwindingWorker* worker,
986
0
    std::unique_ptr<AllocRecord> alloc_rec) {
987
  // Once we can use C++14, this should be std::moved into the lambda instead.
988
0
  auto* raw_alloc_rec = alloc_rec.release();
989
0
  auto weak_this = weak_factory_.GetWeakPtr();
990
0
  task_runner_->PostTask([weak_this, raw_alloc_rec, worker] {
991
0
    std::unique_ptr<AllocRecord> unique_alloc_ref =
992
0
        std::unique_ptr<AllocRecord>(raw_alloc_rec);
993
0
    if (weak_this) {
994
0
      weak_this->HandleAllocRecord(unique_alloc_ref.get());
995
0
      worker->ReturnAllocRecord(std::move(unique_alloc_ref));
996
0
    }
997
0
  });
998
0
}
999
1000
void HeapprofdProducer::PostFreeRecord(UnwindingWorker*,
1001
0
                                       std::vector<FreeRecord> free_recs) {
1002
  // Once we can use C++14, this should be std::moved into the lambda instead.
1003
0
  std::vector<FreeRecord>* raw_free_recs =
1004
0
      new std::vector<FreeRecord>(std::move(free_recs));
1005
0
  auto weak_this = weak_factory_.GetWeakPtr();
1006
0
  task_runner_->PostTask([weak_this, raw_free_recs] {
1007
0
    if (weak_this) {
1008
0
      for (FreeRecord& free_rec : *raw_free_recs)
1009
0
        weak_this->HandleFreeRecord(std::move(free_rec));
1010
0
    }
1011
0
    delete raw_free_recs;
1012
0
  });
1013
0
}
1014
1015
void HeapprofdProducer::PostHeapNameRecord(UnwindingWorker*,
1016
0
                                           HeapNameRecord rec) {
1017
0
  auto weak_this = weak_factory_.GetWeakPtr();
1018
0
  task_runner_->PostTask([weak_this, rec] {
1019
0
    if (weak_this)
1020
0
      weak_this->HandleHeapNameRecord(rec);
1021
0
  });
1022
0
}
1023
1024
void HeapprofdProducer::PostSocketDisconnected(UnwindingWorker*,
1025
                                               DataSourceInstanceID ds_id,
1026
                                               pid_t pid,
1027
0
                                               SharedRingBuffer::Stats stats) {
1028
0
  auto weak_this = weak_factory_.GetWeakPtr();
1029
0
  task_runner_->PostTask([weak_this, ds_id, pid, stats] {
1030
0
    if (weak_this)
1031
0
      weak_this->HandleSocketDisconnected(ds_id, pid, stats);
1032
0
  });
1033
0
}
1034
1035
0
void HeapprofdProducer::HandleAllocRecord(AllocRecord* alloc_rec) {
1036
0
  const AllocMetadata& alloc_metadata = alloc_rec->alloc_metadata;
1037
0
  auto it = data_sources_.find(alloc_rec->data_source_instance_id);
1038
0
  if (it == data_sources_.end()) {
1039
0
    PERFETTO_LOG("Invalid data source in alloc record.");
1040
0
    return;
1041
0
  }
1042
1043
0
  DataSource& ds = it->second;
1044
0
  auto process_state_it = ds.process_states.find(alloc_rec->pid);
1045
0
  if (process_state_it == ds.process_states.end()) {
1046
0
    PERFETTO_LOG("Invalid PID in alloc record.");
1047
0
    return;
1048
0
  }
1049
1050
0
  if (ds.config.stream_allocations()) {
1051
0
    auto packet = ds.trace_writer->NewTracePacket();
1052
0
    auto* streaming_alloc = packet->set_streaming_allocation();
1053
0
    streaming_alloc->add_address(alloc_metadata.alloc_address);
1054
0
    streaming_alloc->add_size(alloc_metadata.alloc_size);
1055
0
    streaming_alloc->add_sample_size(alloc_metadata.sample_size);
1056
0
    streaming_alloc->add_clock_monotonic_coarse_timestamp(
1057
0
        alloc_metadata.clock_monotonic_coarse_timestamp);
1058
0
    streaming_alloc->add_heap_id(alloc_metadata.heap_id);
1059
0
    streaming_alloc->add_sequence_number(alloc_metadata.sequence_number);
1060
0
    return;
1061
0
  }
1062
1063
0
  const auto& prefixes = ds.config.skip_symbol_prefix();
1064
0
  if (!prefixes.empty()) {
1065
0
    for (unwindstack::FrameData& frame_data : alloc_rec->frames) {
1066
0
      if (frame_data.map_info == nullptr) {
1067
0
        continue;
1068
0
      }
1069
0
      const std::string& map = frame_data.map_info->name();
1070
0
      if (std::find_if(prefixes.cbegin(), prefixes.cend(),
1071
0
                       [&map](const std::string& prefix) {
1072
0
                         return base::StartsWith(map, prefix);
1073
0
                       }) != prefixes.cend()) {
1074
0
        frame_data.function_name = "FILTERED";
1075
0
      }
1076
0
    }
1077
0
  }
1078
1079
0
  ProcessState& process_state = process_state_it->second;
1080
0
  HeapTracker& heap_tracker =
1081
0
      process_state.GetHeapTracker(alloc_rec->alloc_metadata.heap_id);
1082
1083
0
  if (alloc_rec->error)
1084
0
    process_state.unwinding_errors++;
1085
0
  if (alloc_rec->reparsed_map)
1086
0
    process_state.map_reparses++;
1087
0
  process_state.heap_samples++;
1088
0
  process_state.unwinding_time_us.Add(alloc_rec->unwinding_time_us);
1089
0
  process_state.total_unwinding_time_us += alloc_rec->unwinding_time_us;
1090
1091
  // abspc may no longer refer to the same functions, as we had to reparse
1092
  // maps. Reset the cache.
1093
0
  if (alloc_rec->reparsed_map)
1094
0
    heap_tracker.ClearFrameCache();
1095
1096
0
  heap_tracker.RecordMalloc(
1097
0
      alloc_rec->frames, alloc_rec->build_ids, alloc_metadata.alloc_address,
1098
0
      alloc_metadata.sample_size, alloc_metadata.alloc_size,
1099
0
      alloc_metadata.sequence_number,
1100
0
      alloc_metadata.clock_monotonic_coarse_timestamp);
1101
0
}
1102
1103
0
void HeapprofdProducer::HandleFreeRecord(FreeRecord free_rec) {
1104
0
  auto it = data_sources_.find(free_rec.data_source_instance_id);
1105
0
  if (it == data_sources_.end()) {
1106
0
    PERFETTO_LOG("Invalid data source in free record.");
1107
0
    return;
1108
0
  }
1109
1110
0
  DataSource& ds = it->second;
1111
0
  auto process_state_it = ds.process_states.find(free_rec.pid);
1112
0
  if (process_state_it == ds.process_states.end()) {
1113
0
    PERFETTO_LOG("Invalid PID in free record.");
1114
0
    return;
1115
0
  }
1116
1117
0
  if (ds.config.stream_allocations()) {
1118
0
    auto packet = ds.trace_writer->NewTracePacket();
1119
0
    auto* streaming_free = packet->set_streaming_free();
1120
0
    streaming_free->add_address(free_rec.entry.addr);
1121
0
    streaming_free->add_heap_id(free_rec.entry.heap_id);
1122
0
    streaming_free->add_sequence_number(free_rec.entry.sequence_number);
1123
0
    return;
1124
0
  }
1125
1126
0
  ProcessState& process_state = process_state_it->second;
1127
1128
0
  const FreeEntry& entry = free_rec.entry;
1129
0
  HeapTracker& heap_tracker = process_state.GetHeapTracker(entry.heap_id);
1130
0
  heap_tracker.RecordFree(entry.addr, entry.sequence_number, 0);
1131
0
}
1132
1133
0
void HeapprofdProducer::HandleHeapNameRecord(HeapNameRecord rec) {
1134
0
  auto it = data_sources_.find(rec.data_source_instance_id);
1135
0
  if (it == data_sources_.end()) {
1136
0
    PERFETTO_LOG("Invalid data source in free record.");
1137
0
    return;
1138
0
  }
1139
1140
0
  DataSource& ds = it->second;
1141
0
  auto process_state_it = ds.process_states.find(rec.pid);
1142
0
  if (process_state_it == ds.process_states.end()) {
1143
0
    PERFETTO_LOG("Invalid PID in free record.");
1144
0
    return;
1145
0
  }
1146
1147
0
  ProcessState& process_state = process_state_it->second;
1148
0
  const HeapName& entry = rec.entry;
1149
0
  if (entry.heap_name[0] != '\0') {
1150
0
    std::string heap_name = entry.heap_name;
1151
0
    if (entry.heap_id == 0) {
1152
0
      PERFETTO_ELOG("Invalid zero heap ID.");
1153
0
      return;
1154
0
    }
1155
0
    ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1156
0
    if (!hi.heap_name.empty() && hi.heap_name != heap_name) {
1157
0
      PERFETTO_ELOG("Overriding heap name %s with %s", hi.heap_name.c_str(),
1158
0
                    heap_name.c_str());
1159
0
    }
1160
0
    hi.heap_name = entry.heap_name;
1161
0
  }
1162
0
  if (entry.sample_interval != 0) {
1163
0
    ProcessState::HeapInfo& hi = process_state.GetHeapInfo(entry.heap_id);
1164
0
    if (!hi.sampling_interval)
1165
0
      hi.orig_sampling_interval = entry.sample_interval;
1166
0
    hi.sampling_interval = entry.sample_interval;
1167
0
  }
1168
0
}
1169
1170
0
void HeapprofdProducer::TerminateWhenDone() {
1171
0
  if (data_sources_.empty())
1172
0
    TerminateProcess(0);
1173
0
  exit_when_done_ = true;
1174
0
}
1175
1176
0
bool HeapprofdProducer::MaybeFinishDataSource(DataSource* ds) {
1177
0
  if (!ds->process_states.empty() || !ds->rejected_pids.empty() ||
1178
0
      !ds->shutting_down) {
1179
0
    return false;
1180
0
  }
1181
1182
0
  bool was_stopped = ds->was_stopped;
1183
0
  DataSourceInstanceID ds_id = ds->id;
1184
0
  auto weak_producer = weak_factory_.GetWeakPtr();
1185
0
  bool exit_when_done = exit_when_done_;
1186
0
  ds->trace_writer->Flush([weak_producer, exit_when_done, ds_id, was_stopped] {
1187
0
    if (!weak_producer)
1188
0
      return;
1189
1190
0
    if (was_stopped)
1191
0
      weak_producer->endpoint_->NotifyDataSourceStopped(ds_id);
1192
0
    weak_producer->data_sources_.erase(ds_id);
1193
1194
0
    if (exit_when_done) {
1195
      // Post this as a task to allow NotifyDataSourceStopped to post tasks.
1196
0
      weak_producer->task_runner_->PostTask([weak_producer] {
1197
0
        if (!weak_producer)
1198
0
          return;
1199
0
        weak_producer->TerminateProcess(
1200
0
            /*exit_status=*/0);  // does not return
1201
0
      });
1202
0
    }
1203
0
  });
1204
0
  return true;
1205
0
}
1206
1207
void HeapprofdProducer::HandleSocketDisconnected(
1208
    DataSourceInstanceID ds_id,
1209
    pid_t pid,
1210
0
    SharedRingBuffer::Stats stats) {
1211
0
  auto it = data_sources_.find(ds_id);
1212
0
  if (it == data_sources_.end())
1213
0
    return;
1214
0
  DataSource& ds = it->second;
1215
1216
0
  auto process_state_it = ds.process_states.find(pid);
1217
0
  if (process_state_it == ds.process_states.end()) {
1218
0
    PERFETTO_ELOG("Unexpected disconnect from %d", pid);
1219
0
    return;
1220
0
  }
1221
1222
0
  PERFETTO_LOG("%d disconnected from heapprofd (ds shutting down: %d).", pid,
1223
0
               ds.shutting_down);
1224
1225
0
  ProcessState& process_state = process_state_it->second;
1226
0
  process_state.disconnected = !ds.shutting_down;
1227
0
  process_state.error_state = stats.error_state;
1228
0
  process_state.client_spinlock_blocked_us = stats.client_spinlock_blocked_us;
1229
0
  process_state.buffer_corrupted =
1230
0
      stats.num_writes_corrupt > 0 || stats.num_reads_corrupt > 0;
1231
1232
0
  DumpProcessState(&ds, pid, &process_state);
1233
0
  ds.process_states.erase(pid);
1234
0
  MaybeFinishDataSource(&ds);
1235
0
}
1236
1237
0
void HeapprofdProducer::CheckDataSourceCpuTask() {
1238
0
  auto weak_producer = weak_factory_.GetWeakPtr();
1239
0
  task_runner_->PostDelayedTask(
1240
0
      [weak_producer] {
1241
0
        if (!weak_producer)
1242
0
          return;
1243
0
        weak_producer->CheckDataSourceCpuTask();
1244
0
      },
1245
0
      kGuardrailIntervalMs);
1246
1247
0
  ProfilerCpuGuardrails gr;
1248
0
  for (auto& p : data_sources_) {
1249
0
    DataSource& ds = p.second;
1250
0
    if (gr.IsOverCpuThreshold(ds.guardrail_config)) {
1251
0
      ds.hit_guardrail = true;
1252
0
      PERFETTO_LOG("Data source %" PRIu64 " hit CPU guardrail. Shutting down.",
1253
0
                   ds.id);
1254
0
      ShutdownDataSource(&ds);
1255
0
    }
1256
0
  }
1257
0
}
1258
1259
0
void HeapprofdProducer::CheckDataSourceMemoryTask() {
1260
0
  auto weak_producer = weak_factory_.GetWeakPtr();
1261
0
  task_runner_->PostDelayedTask(
1262
0
      [weak_producer] {
1263
0
        if (!weak_producer)
1264
0
          return;
1265
0
        weak_producer->CheckDataSourceMemoryTask();
1266
0
      },
1267
0
      kGuardrailIntervalMs);
1268
0
  ProfilerMemoryGuardrails gr;
1269
0
  for (auto& p : data_sources_) {
1270
0
    DataSource& ds = p.second;
1271
0
    if (gr.IsOverMemoryThreshold(ds.guardrail_config)) {
1272
0
      ds.hit_guardrail = true;
1273
      PERFETTO_LOG("Data source %" PRIu64
1274
0
                   " hit memory guardrail. Shutting down.",
1275
0
                   ds.id);
1276
0
      ShutdownDataSource(&ds);
1277
0
    }
1278
0
  }
1279
0
}
1280
1281
}  // namespace profiling
1282
}  // namespace perfetto