Coverage Report

Created: 2024-09-03 06:23

/src/brpc/src/butil/threading/simple_thread.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
#include "butil/threading/simple_thread.h"
6
7
#include "butil/logging.h"
8
#include "butil/strings/string_number_conversions.h"
9
#include "butil/threading/platform_thread.h"
10
#include "butil/threading/thread_restrictions.h"
11
12
namespace butil {
13
14
SimpleThread::SimpleThread(const std::string& name_prefix)
15
    : name_prefix_(name_prefix), name_(name_prefix),
16
0
      thread_(), event_(true, false), tid_(0), joined_(false) {
17
0
}
18
19
SimpleThread::SimpleThread(const std::string& name_prefix,
20
                           const Options& options)
21
    : name_prefix_(name_prefix), name_(name_prefix), options_(options),
22
0
      thread_(), event_(true, false), tid_(0), joined_(false) {
23
0
}
24
25
0
SimpleThread::~SimpleThread() {
26
0
  DCHECK(HasBeenStarted()) << "SimpleThread was never started.";
27
0
  DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed.";
28
0
}
29
30
0
void SimpleThread::Start() {
31
0
  DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times.";
32
0
  bool success = PlatformThread::Create(options_.stack_size(), this, &thread_);
33
0
  DCHECK(success);
34
0
  butil::ThreadRestrictions::ScopedAllowWait allow_wait;
35
0
  event_.Wait();  // Wait for the thread to complete initialization.
36
0
}
37
38
0
void SimpleThread::Join() {
39
0
  DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread.";
40
0
  DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
41
0
  PlatformThread::Join(thread_);
42
0
  joined_ = true;
43
0
}
44
45
0
bool SimpleThread::HasBeenStarted() {
46
0
  butil::ThreadRestrictions::ScopedAllowWait allow_wait;
47
0
  return event_.IsSignaled();
48
0
}
49
50
0
void SimpleThread::ThreadMain() {
51
0
  tid_ = PlatformThread::CurrentId();
52
  // Construct our full name of the form "name_prefix_/TID".
53
0
  name_.push_back('/');
54
0
  name_.append(IntToString(tid_));
55
0
  PlatformThread::SetName(name_.c_str());
56
57
  // We've initialized our new thread, signal that we're done to Start().
58
0
  event_.Signal();
59
60
0
  Run();
61
0
}
62
63
DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
64
                                           const std::string& name_prefix)
65
    : SimpleThread(name_prefix),
66
0
      delegate_(delegate) {
67
0
}
68
69
DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
70
                                           const std::string& name_prefix,
71
                                           const Options& options)
72
    : SimpleThread(name_prefix, options),
73
0
      delegate_(delegate) {
74
0
}
75
76
DelegateSimpleThread::~DelegateSimpleThread() {
77
}
78
79
0
void DelegateSimpleThread::Run() {
80
0
  DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";
81
0
  delegate_->Run();
82
0
  delegate_ = NULL;
83
0
}
84
85
DelegateSimpleThreadPool::DelegateSimpleThreadPool(
86
    const std::string& name_prefix,
87
    int num_threads)
88
    : name_prefix_(name_prefix),
89
      num_threads_(num_threads),
90
0
      dry_(true, false) {
91
0
}
92
93
0
DelegateSimpleThreadPool::~DelegateSimpleThreadPool() {
94
0
  DCHECK(threads_.empty());
95
0
  DCHECK(delegates_.empty());
96
0
  DCHECK(!dry_.IsSignaled());
97
0
}
98
99
0
void DelegateSimpleThreadPool::Start() {
100
0
  DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
101
0
  for (int i = 0; i < num_threads_; ++i) {
102
0
    DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_);
103
0
    thread->Start();
104
0
    threads_.push_back(thread);
105
0
  }
106
0
}
107
108
0
void DelegateSimpleThreadPool::JoinAll() {
109
0
  DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
110
111
  // Tell all our threads to quit their worker loop.
112
0
  AddWork(NULL, num_threads_);
113
114
  // Join and destroy all the worker threads.
115
0
  for (int i = 0; i < num_threads_; ++i) {
116
0
    threads_[i]->Join();
117
0
    delete threads_[i];
118
0
  }
119
0
  threads_.clear();
120
0
  DCHECK(delegates_.empty());
121
0
}
122
123
0
void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
124
0
  AutoLock locked(lock_);
125
0
  for (int i = 0; i < repeat_count; ++i)
126
0
    delegates_.push(delegate);
127
  // If we were empty, signal that we have work now.
128
0
  if (!dry_.IsSignaled())
129
0
    dry_.Signal();
130
0
}
131
132
0
void DelegateSimpleThreadPool::Run() {
133
0
  Delegate* work = NULL;
134
135
0
  while (true) {
136
0
    dry_.Wait();
137
0
    {
138
0
      AutoLock locked(lock_);
139
0
      if (!dry_.IsSignaled())
140
0
        continue;
141
142
0
      DCHECK(!delegates_.empty());
143
0
      work = delegates_.front();
144
0
      delegates_.pop();
145
146
      // Signal to any other threads that we're currently out of work.
147
0
      if (delegates_.empty())
148
0
        dry_.Reset();
149
0
    }
150
151
    // A NULL delegate pointer signals us to quit.
152
0
    if (!work)
153
0
      break;
154
155
0
    work->Run();
156
0
  }
157
0
}
158
159
}  // namespace butil