1
#pragma once
2

            
3
#include "source/common/common/logger.h"
4
#include "source/extensions/config_subscription/grpc/grpc_stream.h"
5

            
6
namespace Envoy {
7
namespace Config {
8

            
9
/**
10
 * This class arbitrates between two config providers of the same GrpcMux -
11
 * the primary and the failover. Envoy always prefers fetching config from the
12
 * primary source, but if not available, it will fetch the config from the failover
13
 * source until the primary is again available.
14
 *
15
 * This class owns the state for the GrpcMux primary and failover streams, and
16
 * proxies the gRPC-stream functionality to either the primary or the failover config sources.
17
 * The failover source is optional and will only be used if given in the c'tor.
18
 *
19
 * Failover is supported in both
20
 * SotW (RequestType = envoy::service::discovery::v3::DiscoveryRequest,
21
 *   ResponseType = envoy::service::discovery::v3::DiscoveryResponse), and
22
 * Delta-xDS (RequestType = envoy::service::discovery::v3::DeltaDiscoveryRequest,
23
 *   ResponseType = envoy::service::discovery::v3::DeltaDiscoveryResponse).
24
 * Both the primary and failover streams are either SotW or Delta-xDS.
25
 *
26
 * The use of this class will be as follows: the GrpcMux object will own an instance of
27
 * the GrpcMuxFailover. The GrpcMuxFailover will own 2 GrpcStreams, primary and failover.
28
 * Each of the primary and secondary streams will invoke GrpcStreamCallbacks
29
 * on their corresponding objects (also owned by the GrpcMuxFailover). These invocations
30
 * will be followed by the GrpcMuxFailover calling the GrpcStreamCallbacks on the GrpcMux
31
 * object that initialized it.
32
 *
33
 * To simplify the state-machine, Envoy can be in one of the mutually exclusive states:
34
 *   ConnectingToPrimary - attempting to connect to the primary source.
35
 *   ConnectedToPrimary - after receiving a response from the primary source.
36
 *   ConnectingToFailover - attempting to connect to the failover source.
37
 *   ConnectedToFailover - after receiving a response from the failover source.
38
 *   None - not attempting to connect or connected to any source (e.g., upon  initialization).
39
 *
40
 * The GrpcMuxFailover attempts to establish a connection to the primary source. Once a response is
41
 * received from the primary source it will be considered available, and the failover will not be
42
 * used. Any future reconnection attempts will be to the primary source only.
43
 * However, if no response is received from the primary source, and accessing the primary has
44
 * failed 2 times in a row, the GrpcMuxFailover will attempt to establish a connection to the
45
 * failover source. Envoy will keep alternating between the primary and failover sources attempting
46
 * to connect to one of them. If a response from the failover source is received, it will be the
47
 * source of configuration until the connection is closed. In case the failover connection is
48
 * closed, Envoy will attempt to connect to the primary, before retrying to connect to the failover
49
 * source. If the failover source is unavailable or a connection to it is closed, the
50
 * GrpcMuxFailover will alternate between attempts to reconnect to the primary source and the
51
 * failover source.
52
 * TODO(adisuissa): The number of consecutive failures is currently statically
53
 * defined, and may be converted to a config field in the future.
54
 */
55
template <class RequestType, class ResponseType>
56
class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
57
                        public Logger::Loggable<Logger::Id::config> {
58
public:
59
  static constexpr uint32_t DefaultFailoverBackoffMilliseconds = 500;
60

            
61
  // A GrpcStream creator function that receives the stream callbacks and returns a
62
  // GrpcStream object. This is introduced to facilitate dependency injection for
63
  // testing and will be used to create the primary and failover streams.
64
  using GrpcStreamCreator = std::function<GrpcStreamInterfacePtr<RequestType, ResponseType>(
65
      GrpcStreamCallbacks<ResponseType>* stream_callbacks)>;
66

            
67
  GrpcMuxFailover(GrpcStreamCreator primary_stream_creator,
68
                  absl::optional<GrpcStreamCreator> failover_stream_creator,
69
                  GrpcStreamCallbacks<ResponseType>& grpc_mux_callbacks,
70
                  Event::Dispatcher& dispatcher)
71
213
      : grpc_mux_callbacks_(grpc_mux_callbacks), primary_callbacks_(*this),
72
213
        primary_grpc_stream_(std::move(primary_stream_creator(&primary_callbacks_))),
73
213
        connection_state_(ConnectionState::None), ever_connected_to_primary_(false),
74
213
        previously_connected_to_(ConnectedTo::None) {
75
213
    ASSERT(primary_grpc_stream_ != nullptr);
76
213
    if (failover_stream_creator.has_value()) {
77
82
      ENVOY_LOG(warn, "Using xDS-Failover. Note that the implementation is currently considered "
78
82
                      "experimental and may be modified in future Envoy versions!");
79
      // Only create the retry timer if failover is supported.
80
82
      complete_retry_timer_ = dispatcher.createTimer([this]() -> void { retryConnections(); });
81
82
      failover_callbacks_ = std::make_unique<FailoverGrpcStreamCallbacks>(*this);
82
82
      GrpcStreamCreator& failover_stream_creator_ref = failover_stream_creator.value();
83
82
      failover_grpc_stream_ = std::move(failover_stream_creator_ref(failover_callbacks_.get()));
84
82
      ASSERT(failover_grpc_stream_ != nullptr);
85
82
    }
86
213
  }
87

            
88
213
  virtual ~GrpcMuxFailover() = default;
89

            
90
  // Attempts to establish a new stream to the either the primary or failover source.
91
191
  void establishNewStream() override {
92
    // Attempt establishing a connection to the primary source.
93
    // This method may be called multiple times, even if the primary/failover stream
94
    // is already established or in the process of being established.
95
191
    if (complete_retry_timer_) {
96
87
      complete_retry_timer_->disableTimer();
97
87
    }
98
    // If already connected to one of the source, return.
99
191
    if (connection_state_ == ConnectionState::ConnectedToPrimary ||
100
191
        connection_state_ == ConnectionState::ConnectedToFailover) {
101
2
      ENVOY_LOG_MISC(trace,
102
2
                     "Already connected to an xDS server, skipping establishNewStream() call");
103
2
      return;
104
2
    }
105
189
    if (!Runtime::runtimeFeatureEnabled(
106
189
            "envoy.reloadable_features.xds_failover_to_primary_enabled")) {
107
      // Allow stickiness, so if Envoy was ever connected to the primary source only
108
      // retry to reconnect to the primary source, If Envoy was ever connected to the
109
      // failover source then only retry to reconnect to the failover source.
110
6
      if (previously_connected_to_ == ConnectedTo::Primary) {
111
        ENVOY_LOG_MISC(
112
            info, "Previously connected to the primary xDS source, attempting to reconnect to it");
113
        connection_state_ = ConnectionState::ConnectingToPrimary;
114
        primary_grpc_stream_->establishNewStream();
115
        return;
116
6
      } else if (previously_connected_to_ == ConnectedTo::Failover) {
117
1
        ENVOY_LOG_MISC(
118
1
            info, "Previously connected to the failover xDS source, attempting to reconnect to it");
119
1
        connection_state_ = ConnectionState::ConnectingToFailover;
120
1
        failover_grpc_stream_->establishNewStream();
121
1
        return;
122
1
      }
123
6
    }
124
    // connection_state_ is either None, ConnectingToPrimary or
125
    // ConnectingToFailover. In the first 2 cases try to connect to the primary
126
    // (preferring the primary in the case of None), and in the third case
127
    // try to connect to the failover.
128
    // Note that if a connection to the primary source was ever successful, the
129
    // failover manager will keep setting connection_state_ to either None or
130
    // ConnectingToPrimary, which ensures that only the primary stream will be
131
    // established.
132
188
    if (connection_state_ == ConnectionState::ConnectingToFailover) {
133
2
      ASSERT(!ever_connected_to_primary_);
134
2
      failover_grpc_stream_->establishNewStream();
135
186
    } else {
136
186
      ASSERT(connection_state_ == ConnectionState::None ||
137
186
             connection_state_ == ConnectionState::ConnectingToPrimary);
138
186
      connection_state_ = ConnectionState::ConnectingToPrimary;
139
186
      primary_grpc_stream_->establishNewStream();
140
186
    }
141
188
  }
142

            
143
  // Returns the availability of the underlying stream.
144
2546
  bool grpcStreamAvailable() const override {
145
2546
    if (connectingToOrConnectedToFailover()) {
146
378
      return failover_grpc_stream_->grpcStreamAvailable();
147
378
    }
148
    // Either connecting/connected to the primary, or no connection was attempted.
149
2168
    return primary_grpc_stream_->grpcStreamAvailable();
150
2546
  }
151

            
152
  // Sends a message using the underlying stream.
153
1841
  void sendMessage(const RequestType& request) override {
154
1841
    if (connectingToOrConnectedToFailover()) {
155
338
      ASSERT(!ever_connected_to_primary_);
156
338
      failover_grpc_stream_->sendMessage(request);
157
338
      return;
158
338
    }
159
    // Either connecting/connected to the primary, or no connection was attempted.
160
1503
    primary_grpc_stream_->sendMessage(request);
161
1503
  }
162

            
163
  // Updates the queue size of the underlying stream.
164
2556
  void maybeUpdateQueueSizeStat(uint64_t size) override {
165
2556
    if (connectingToOrConnectedToFailover()) {
166
648
      failover_grpc_stream_->maybeUpdateQueueSizeStat(size);
167
648
      return;
168
648
    }
169
    // Either connecting/connected to the primary, or no connection was attempted.
170
1908
    primary_grpc_stream_->maybeUpdateQueueSizeStat(size);
171
1908
  }
172

            
173
  // Returns true if the rate-limit allows draining.
174
1885
  bool checkRateLimitAllowsDrain() override {
175
1885
    if (connectingToOrConnectedToFailover()) {
176
338
      return failover_grpc_stream_->checkRateLimitAllowsDrain();
177
338
    }
178
    // Either connecting/connected to the primary, or no connection was attempted.
179
1547
    return primary_grpc_stream_->checkRateLimitAllowsDrain();
180
1885
  }
181

            
182
  // Returns the close status for testing purposes only.
183
  absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() {
184
    if (connectingToOrConnectedToFailover()) {
185
      return failover_grpc_stream_->getCloseStatusForTest();
186
    }
187
    ASSERT(connectingToOrConnectedToPrimary());
188
    return primary_grpc_stream_->getCloseStatusForTest();
189
  }
190

            
191
  // Returns the current stream for testing purposes only.
192
507
  GrpcStreamInterface<RequestType, ResponseType>& currentStreamForTest() {
193
507
    if (connectingToOrConnectedToFailover()) {
194
      return *failover_grpc_stream_;
195
    }
196
507
    ASSERT(connectingToOrConnectedToPrimary());
197
507
    return *primary_grpc_stream_;
198
507
  };
199

            
200
  // Retries to connect again to the primary and then (possibly) to the
201
  // failover. Assumes that no connection has been made or is being attempted.
202
21
  void retryConnections() {
203
21
    ASSERT(connection_state_ == ConnectionState::None);
204
21
    ENVOY_LOG(trace, "Expired timer, retrying to reconnect to the primary xDS server.");
205
21
    connection_state_ = ConnectionState::ConnectingToPrimary;
206
21
    primary_grpc_stream_->establishNewStream();
207
21
  }
208

            
209
private:
210
  friend class GrpcMuxFailoverTest;
211

            
212
  // A helper class that proxies the callbacks of GrpcStreamCallbacks for the primary service.
213
  class PrimaryGrpcStreamCallbacks : public GrpcStreamCallbacks<ResponseType> {
214
  public:
215
213
    PrimaryGrpcStreamCallbacks(GrpcMuxFailover& parent) : parent_(parent) {}
216

            
217
245
    void onStreamEstablished() override {
218
      // Although onStreamEstablished is invoked on the the primary stream, Envoy
219
      // needs to wait for the first response to be received from it before
220
      // considering the primary source as available.
221
      // Calling the onStreamEstablished() callback on the GrpcMux object will
222
      // trigger the GrpcMux to start sending requests.
223
245
      ASSERT(parent_.connection_state_ == ConnectionState::ConnectingToPrimary);
224
245
      parent_.grpc_mux_callbacks_.onStreamEstablished();
225
245
    }
226

            
227
192
    void onEstablishmentFailure(bool) override {
228
      // This will be called when the primary stream fails to establish a connection, or after the
229
      // connection was closed.
230
192
      ASSERT(parent_.connectingToOrConnectedToPrimary());
231
      // If there's no failover supported, this will just be a pass-through
232
      // callback.
233
192
      if (parent_.failover_grpc_stream_ != nullptr) {
234
174
        if (parent_.connection_state_ == ConnectionState::ConnectingToPrimary &&
235
174
            !parent_.ever_connected_to_primary_) {
236
          // If there are 2 consecutive failures to the primary, Envoy will try to connect to the
237
          // failover.
238
133
          primary_consecutive_failures_++;
239
133
          if (primary_consecutive_failures_ >= 2) {
240
            // The primary stream failed to establish a connection 2 times in a row.
241
            // Terminate the primary stream and establish a connection to the failover stream.
242
74
            ENVOY_LOG(info, "Primary xDS stream failed to establish a connection at least 2 times "
243
74
                            "in a row. Attempting to connect to the failover stream.");
244
            // This will close the stream and prevent the retry timer from
245
            // reconnecting to the primary source.
246
74
            parent_.primary_grpc_stream_->closeStream();
247
            // Next attempt will be to the failover, set the value that
248
            // determines whether to set initial_resource_versions or not.
249
74
            parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ ==
250
74
                                                               ConnectedTo::Failover);
251
74
            parent_.connection_state_ = ConnectionState::ConnectingToFailover;
252
74
            parent_.failover_grpc_stream_->establishNewStream();
253
74
            return;
254
74
          }
255
133
        }
256
174
      }
257
      // Pass along the failure to the GrpcMux object. Retry will be triggered
258
      // later by the underlying grpc stream.
259
118
      ENVOY_LOG_MISC(trace, "Not trying to connect to failover. Will try again to reconnect to the "
260
118
                            "primary (upon retry).");
261
118
      parent_.connection_state_ = ConnectionState::ConnectingToPrimary;
262
      // Next attempt will be to the primary, set the value that
263
      // determines whether to set initial_resource_versions or not.
264
118
      parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ ==
265
118
                                                         ConnectedTo::Primary);
266
118
    }
267

            
268
    void onDiscoveryResponse(ResponseProtoPtr<ResponseType>&& message,
269
552
                             ControlPlaneStats& control_plane_stats) override {
270
552
      ASSERT((parent_.connectingToOrConnectedToPrimary()) &&
271
552
             !parent_.connectingToOrConnectedToFailover());
272
      // Received a response from the primary. The primary is now considered available (no failover
273
      // will be attempted).
274
552
      parent_.ever_connected_to_primary_ = true;
275
552
      primary_consecutive_failures_ = 0;
276
552
      parent_.connection_state_ = ConnectionState::ConnectedToPrimary;
277
552
      parent_.previously_connected_to_ = ConnectedTo::Primary;
278
552
      parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats);
279
552
    }
280

            
281
4
    void onWriteable() override {
282
4
      if (parent_.connectingToOrConnectedToPrimary()) {
283
4
        parent_.grpc_mux_callbacks_.onWriteable();
284
4
      }
285
4
    }
286

            
287
  private:
288
    GrpcMuxFailover& parent_;
289
    uint32_t primary_consecutive_failures_{0};
290
  };
291

            
292
  // A helper class that proxies the callbacks of GrpcStreamCallbacks for the failover service.
293
  class FailoverGrpcStreamCallbacks : public GrpcStreamCallbacks<ResponseType> {
294
  public:
295
82
    FailoverGrpcStreamCallbacks(GrpcMuxFailover& parent) : parent_(parent) {}
296

            
297
85
    void onStreamEstablished() override {
298
      // Although the failover stream is considered established, need to wait for the
299
      // the first response to be received before considering the failover available.
300
      // Calling the onStreamEstablished() callback on the GrpcMux object will
301
      // trigger the GrpcMux to start sending requests.
302
85
      ASSERT(parent_.connection_state_ == ConnectionState::ConnectingToFailover);
303
85
      ASSERT(!parent_.ever_connected_to_primary_);
304
85
      parent_.grpc_mux_callbacks_.onStreamEstablished();
305
85
    }
306

            
307
87
    void onEstablishmentFailure(bool) override {
308
      // This will be called when the failover stream fails to establish a connection, or after the
309
      // connection was closed.
310
87
      ASSERT(parent_.connectingToOrConnectedToFailover());
311
87
      if (!Runtime::runtimeFeatureEnabled(
312
87
              "envoy.reloadable_features.xds_failover_to_primary_enabled")) {
313
        // If previously Envoy was connected to the failover, keep using that.
314
        // Otherwise let the retry mechanism try to access the primary (similar
315
        // to if the runtime flag was not set).
316
33
        if (parent_.previously_connected_to_ == ConnectedTo::Failover) {
317
33
          ENVOY_LOG(info,
318
33
                    "Failover xDS stream disconnected (either after establishing a connection or "
319
33
                    "before). Attempting to reconnect to Failover because Envoy successfully "
320
33
                    "connected to it previously.");
321
          // Not closing the failover stream, allows it to use its retry timer
322
          // to reconnect to the failover source.
323
          // Next attempt will be to the failover after Envoy was already
324
          // connected to it. Allowing to send the initial_resource_versions on reconnect.
325
33
          parent_.grpc_mux_callbacks_.onEstablishmentFailure(true);
326
33
          parent_.connection_state_ = ConnectionState::ConnectingToFailover;
327
33
          return;
328
33
        }
329
33
      }
330
      // Either this was an intentional disconnection from the failover source,
331
      // or unintentional. Either way, try to connect to the primary next.
332
54
      ENVOY_LOG(debug,
333
54
                "Failover xDS stream disconnected (either after establishing a connection or "
334
54
                "before). Attempting to connect to the primary stream.");
335

            
336
      // This will close the stream and prevent the retry timer from
337
      // reconnecting to the failover source. The next attempt will be to the
338
      // primary source.
339
54
      parent_.failover_grpc_stream_->closeStream();
340
      // Next attempt will be to the primary, set the value that
341
      // determines whether to set initial_resource_versions or not.
342
54
      parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ ==
343
54
                                                         ConnectedTo::Primary);
344
      // Setting the connection state to None, and when the retry timer will
345
      // expire, Envoy will try to connect to the primary source.
346
54
      parent_.connection_state_ = ConnectionState::None;
347
      // Wait for a short period of time before retrying to reconnect to the
348
      // primary, reducing strain on the network/servers in case of an issue.
349
      // TODO(adisuissa): need to use the primary source's retry timer here, to wait
350
      // for the next time to connect to the primary. This requires a refactor
351
      // of the retry timer and moving it from the grpc_stream to here.
352
54
      parent_.complete_retry_timer_->enableTimer(std::chrono::milliseconds(500));
353
54
    }
354

            
355
    void onDiscoveryResponse(ResponseProtoPtr<ResponseType>&& message,
356
117
                             ControlPlaneStats& control_plane_stats) override {
357
117
      ASSERT(parent_.connectingToOrConnectedToFailover());
358
117
      ASSERT(!parent_.ever_connected_to_primary_);
359
      // Received a response from the failover. The failover is now considered available (no going
360
      // back to the primary will be attempted).
361
117
      parent_.connection_state_ = ConnectionState::ConnectedToFailover;
362
117
      parent_.previously_connected_to_ = ConnectedTo::Failover;
363
117
      parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats);
364
117
    }
365

            
366
2
    void onWriteable() override {
367
2
      if (parent_.connectingToOrConnectedToFailover()) {
368
2
        parent_.grpc_mux_callbacks_.onWriteable();
369
2
      }
370
2
    }
371

            
372
  private:
373
    GrpcMuxFailover& parent_;
374
  };
375

            
376
  // Returns true iff the state is connecting to primary or connected to it.
377
14
  bool connectingToOrConnectedToPrimary() const {
378
14
    return connection_state_ == ConnectionState::ConnectingToPrimary ||
379
14
           connection_state_ == ConnectionState::ConnectedToPrimary;
380
14
  }
381

            
382
  // Returns true iff the state is connecting to failover or connected to it.
383
9338
  bool connectingToOrConnectedToFailover() const {
384
9338
    return connection_state_ == ConnectionState::ConnectingToFailover ||
385
9338
           connection_state_ == ConnectionState::ConnectedToFailover;
386
9338
  }
387

            
388
  // The following method overrides are to allow GrpcMuxFailover to extend the
389
  // GrpcStreamInterface. Once envoy.restart_features.xds_failover_support is deprecated,
390
  // the class will no longer need to extend the interface, and these can be removed.
391
  void onCreateInitialMetadata(Http::RequestHeaderMap&) override { PANIC("not implemented"); }
392
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override { PANIC("not implemented"); }
393
  void onReceiveMessage(std::unique_ptr<ResponseType>&&) override { PANIC("not implemented"); }
394
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {
395
    PANIC("not implemented");
396
  }
397
  void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
398
    PANIC("not implemented");
399
  }
400
10
  void closeStream() override {
401
10
    if (connectingToOrConnectedToPrimary()) {
402
9
      ENVOY_LOG_MISC(debug, "Intentionally closing the primary gRPC stream");
403
9
      primary_grpc_stream_->closeStream();
404
9
    } else if (connectingToOrConnectedToFailover()) {
405
1
      ENVOY_LOG_MISC(debug, "Intentionally closing the failover gRPC stream");
406
1
      failover_grpc_stream_->closeStream();
407
1
    }
408
10
  }
409

            
410
  // The stream callbacks that will be invoked on the GrpcMux object, to notify
411
  // about the state of the underlying primary/failover stream.
412
  GrpcStreamCallbacks<ResponseType>& grpc_mux_callbacks_;
413
  // The callbacks that will be invoked by the primary stream.
414
  PrimaryGrpcStreamCallbacks primary_callbacks_;
415
  // The stream to the primary source.
416
  GrpcStreamInterfacePtr<RequestType, ResponseType> primary_grpc_stream_;
417
  // The callbacks that will be invoked by the failover stream.
418
  std::unique_ptr<FailoverGrpcStreamCallbacks> failover_callbacks_;
419
  // The stream to the failover source.
420
  GrpcStreamInterfacePtr<RequestType, ResponseType> failover_grpc_stream_;
421

            
422
  // A timer that allows waiting for some period of time before trying to
423
  // connect again after both primary and failover attempts failed. Only
424
  // initialized when failover is supported.
425
  Event::TimerPtr complete_retry_timer_{nullptr};
426

            
427
  enum class ConnectionState {
428
    None,
429
    ConnectingToPrimary,
430
    ConnectedToPrimary,
431
    ConnectingToFailover,
432
    ConnectedToFailover
433
  };
434

            
435
  // Flags to keep track of the state of connections to primary/failover.
436
  // The object starts with all the connecting_to and connected_to flags set
437
  // to None.
438
  // Once a new stream is attempted, connecting_to_ will become Primary, until
439
  // a response will be received from the primary (connected_to_ will be set
440
  // to Primary), or a failure to establish a connection to the primary occurs.
441
  // In the latter case, if Envoy attempts to reconnect to the primary,
442
  // connecting_to_ will stay Primary, but if it attempts to connect to the failover,
443
  // connecting_to_ will be set to Failover.
444
  // If Envoy successfully connects to the failover, connected_to_ will be set
445
  // to Failover.
446
  // Note that while Envoy can only be connected to a single source (mutually
447
  // exclusive), it can attempt connecting to more than one source at a time.
448
  ConnectionState connection_state_;
449

            
450
  // A flag that keeps track of whether Envoy successfully connected to the
451
  // primary source. Envoy is considered successfully connected to a source
452
  // once it receives a response from it.
453
  bool ever_connected_to_primary_{false};
454

            
455
  enum class ConnectedTo { None, Primary, Failover };
456
  // Used to track the most recent source that Envoy was connected to.
457
  ConnectedTo previously_connected_to_;
458
};
459

            
460
} // namespace Config
461
} // namespace Envoy