Coverage Report

Created: 2026-02-14 06:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/suricata7/rust/src/pgsql/pgsql.rs
Line
Count
Source
1
/* Copyright (C) 2022-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17
18
// Author: Juliana Fajardini <jufajardini@oisf.net>
19
20
//! PostgreSQL parser
21
22
use super::parser::{self, ConsolidatedDataRowPacket, PgsqlBEMessage, PgsqlFEMessage};
23
use crate::applayer::*;
24
use crate::conf::*;
25
use crate::core::{AppProto, Flow, Direction, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP};
26
use crate::core::sc_app_layer_parser_trigger_raw_stream_reassembly;
27
use nom7::{Err, IResult};
28
use std;
29
use std::collections::VecDeque;
30
use std::ffi::CString;
31
32
pub const PGSQL_CONFIG_DEFAULT_STREAM_DEPTH: u32 = 0;
33
34
static mut ALPROTO_PGSQL: AppProto = ALPROTO_UNKNOWN;
35
36
static mut PGSQL_MAX_TX: usize = 1024;
37
38
#[repr(u8)]
39
#[derive(Copy, Clone, PartialOrd, PartialEq, Eq, Debug)]
40
pub enum PgsqlTxProgress {
41
    TxInit = 0,
42
    TxReceived,
43
    TxDone,
44
    TxFlushedOut,
45
}
46
47
#[derive(Debug)]
48
pub struct PgsqlTransaction {
49
    pub tx_id: u64,
50
    pub tx_req_state: PgsqlTxProgress,
51
    pub tx_res_state: PgsqlTxProgress,
52
    pub request: Option<PgsqlFEMessage>,
53
    pub responses: Vec<PgsqlBEMessage>,
54
55
    pub data_row_cnt: u64,
56
    pub data_size: u64,
57
58
    tx_data: AppLayerTxData,
59
}
60
61
impl Transaction for PgsqlTransaction {
62
176M
    fn id(&self) -> u64 {
63
176M
        self.tx_id
64
176M
    }
65
}
66
67
impl Default for PgsqlTransaction {
68
0
    fn default() -> Self {
69
0
        Self::new()
70
0
    }
71
}
72
73
impl PgsqlTransaction {
74
5.64M
    pub fn new() -> Self {
75
5.64M
        Self {
76
5.64M
            tx_id: 0,
77
5.64M
            tx_req_state: PgsqlTxProgress::TxInit,
78
5.64M
            tx_res_state: PgsqlTxProgress::TxInit,
79
5.64M
            request: None,
80
5.64M
            responses: Vec::<PgsqlBEMessage>::new(),
81
5.64M
            data_row_cnt: 0,
82
5.64M
            data_size: 0,
83
5.64M
            tx_data: AppLayerTxData::new(),
84
5.64M
        }
85
5.64M
    }
86
87
178k
    pub fn incr_row_cnt(&mut self) {
88
178k
        self.data_row_cnt = self.data_row_cnt.saturating_add(1);
89
178k
    }
90
91
533k
    pub fn get_row_cnt(&self) -> u64 {
92
533k
        self.data_row_cnt
93
533k
    }
94
95
178k
    pub fn sum_data_size(&mut self, row_size: u64) {
96
178k
        self.data_size += row_size;
97
178k
    }
98
}
99
100
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101
pub enum PgsqlStateProgress {
102
    IdleState,
103
    // Related to Frontend-received messages //
104
    SSLRequestReceived,
105
    StartupMessageReceived,
106
    SASLInitialResponseReceived,
107
    SASLResponseReceived,
108
    PasswordMessageReceived,
109
    SimpleQueryReceived,
110
    CancelRequestReceived,
111
    ConnectionTerminated,
112
    // Related to Backend-received messages //
113
    SSLRejectedReceived,
114
    // SSPIAuthenticationReceived, // TODO implement
115
    SASLAuthenticationReceived,
116
    SASLAuthenticationContinueReceived,
117
    SASLAuthenticationFinalReceived,
118
    SimpleAuthenticationReceived,
119
    AuthenticationOkReceived,
120
    ParameterSetup,
121
    BackendKeyReceived,
122
    ReadyForQueryReceived,
123
    RowDescriptionReceived,
124
    DataRowReceived,
125
    CommandCompletedReceived,
126
    ErrorMessageReceived,
127
    #[cfg(test)]
128
    UnknownState,
129
    Finished,
130
}
131
132
#[derive(Debug)]
133
pub struct PgsqlState {
134
    state_data: AppLayerStateData,
135
    tx_id: u64,
136
    transactions: VecDeque<PgsqlTransaction>,
137
    request_gap: bool,
138
    response_gap: bool,
139
    backend_secret_key: u32,
140
    backend_pid: u32,
141
    state_progress: PgsqlStateProgress,
142
    tx_index_completed: usize,
143
}
144
145
impl State<PgsqlTransaction> for PgsqlState {
146
60.2M
    fn get_transaction_count(&self) -> usize {
147
60.2M
        self.transactions.len()
148
60.2M
    }
149
150
116M
    fn get_transaction_by_index(&self, index: usize) -> Option<&PgsqlTransaction> {
151
116M
        self.transactions.get(index)
152
116M
    }
153
}
154
155
impl Default for PgsqlState {
156
0
    fn default() -> Self {
157
0
        Self::new()
158
0
    }
159
}
160
161
impl PgsqlState {
162
9.75k
    pub fn new() -> Self {
163
9.75k
        Self {
164
9.75k
            state_data: AppLayerStateData::new(),
165
9.75k
            tx_id: 0,
166
9.75k
            transactions: VecDeque::new(),
167
9.75k
            request_gap: false,
168
9.75k
            response_gap: false,
169
9.75k
            backend_secret_key: 0,
170
9.75k
            backend_pid: 0,
171
9.75k
            state_progress: PgsqlStateProgress::IdleState,
172
9.75k
            tx_index_completed: 0,
173
9.75k
        }
174
9.75k
    }
175
176
    // Free a transaction by ID.
177
3.37M
    fn free_tx(&mut self, tx_id: u64) {
178
3.37M
        let len = self.transactions.len();
179
3.37M
        let mut found = false;
180
3.37M
        let mut index = 0;
181
11.6M
        for i in 0..len {
182
11.6M
            let tx = &self.transactions[i];
183
11.6M
            if tx.tx_id == tx_id + 1 {
184
3.37M
                found = true;
185
3.37M
                index = i;
186
3.37M
                break;
187
8.23M
            }
188
        }
189
3.37M
        if found {
190
3.37M
            self.tx_index_completed = 0;
191
3.37M
            self.transactions.remove(index);
192
3.37M
        }
193
3.37M
    }
194
195
0
    pub fn get_tx(&mut self, tx_id: u64) -> Option<&PgsqlTransaction> {
196
0
        self.transactions.iter().find(|tx| tx.tx_id == tx_id + 1)
197
0
    }
198
199
5.64M
    fn new_tx(&mut self) -> PgsqlTransaction {
200
5.64M
        let mut tx = PgsqlTransaction::new();
201
5.64M
        self.tx_id += 1;
202
5.64M
        tx.tx_id = self.tx_id;
203
        SCLogDebug!("Creating new transaction. tx_id: {}", tx.tx_id);
204
5.64M
        if self.transactions.len() > unsafe { PGSQL_MAX_TX } + self.tx_index_completed {
205
            // If there are too many open transactions,
206
            // mark the earliest ones as completed, and take care
207
            // to avoid quadratic complexity
208
645k
            let mut index = self.tx_index_completed;
209
5.05M
            for tx_old in &mut self.transactions.range_mut(self.tx_index_completed..) {
210
5.05M
                index += 1;
211
5.05M
                if tx_old.tx_res_state < PgsqlTxProgress::TxDone {
212
640k
                    tx_old.tx_data.updated_tc = true;
213
640k
                    tx_old.tx_data.updated_ts = true;
214
                    // we don't check for TxReqDone for the majority of requests are basically completed
215
                    // when they're parsed, as of now
216
640k
                    tx_old.tx_req_state = PgsqlTxProgress::TxFlushedOut;
217
640k
                    tx_old.tx_res_state = PgsqlTxProgress::TxFlushedOut;
218
                    //TODO set event
219
640k
                    break;
220
4.41M
                }
221
            }
222
645k
            self.tx_index_completed = index;
223
5.00M
        }
224
5.64M
        return tx;
225
5.64M
    }
226
227
    /// Find or create a new transaction
228
    ///
229
    /// If a new transaction is created, push that into state.transactions before returning &mut to last tx
230
    /// If we can't find a transaction and we should not create one, we return None
231
    /// The moment when this is called will may impact the logic of transaction tracking (e.g. when a tx is considered completed)
232
    // TODO A future, improved version may be based on current message type and dir, too
233
6.50M
    fn find_or_create_tx(&mut self) -> Option<&mut PgsqlTransaction> {
234
        // First, check if we should create a new tx (in case the other was completed or there's no tx yet)
235
6.50M
        if self.state_progress == PgsqlStateProgress::IdleState
236
6.17M
            || self.state_progress == PgsqlStateProgress::StartupMessageReceived
237
6.16M
            || self.state_progress == PgsqlStateProgress::PasswordMessageReceived
238
6.15M
            || self.state_progress == PgsqlStateProgress::SASLInitialResponseReceived
239
6.15M
            || self.state_progress == PgsqlStateProgress::SASLResponseReceived
240
6.15M
            || self.state_progress == PgsqlStateProgress::SimpleQueryReceived
241
6.15M
            || self.state_progress == PgsqlStateProgress::SSLRequestReceived
242
5.79M
            || self.state_progress == PgsqlStateProgress::ConnectionTerminated
243
858k
            || self.state_progress == PgsqlStateProgress::CancelRequestReceived
244
5.64M
        {
245
5.64M
            let tx = self.new_tx();
246
5.64M
            self.transactions.push_back(tx);
247
5.64M
        }
248
        // If we don't need a new transaction, just return the current one
249
        SCLogDebug!("find_or_create state is {:?}", &self.state_progress);
250
6.50M
        return self.transactions.back_mut();
251
6.50M
    }
252
253
    /// Define PgsqlState progression, based on the request received
254
    ///
255
    /// As PostgreSQL transactions can have multiple messages, State progression
256
    /// is what helps us keep track of the PgsqlTransactions - when one finished
257
    /// when the other starts.
258
    /// State isn't directly updated to avoid reference borrowing conflicts.
259
5.57M
    fn request_next_state(request: &PgsqlFEMessage) -> Option<PgsqlStateProgress> {
260
5.57M
        match request {
261
355k
            PgsqlFEMessage::SSLRequest(_) => Some(PgsqlStateProgress::SSLRequestReceived),
262
9.80k
            PgsqlFEMessage::StartupMessage(_) => Some(PgsqlStateProgress::StartupMessageReceived),
263
756
            PgsqlFEMessage::PasswordMessage(_) => Some(PgsqlStateProgress::PasswordMessageReceived),
264
            PgsqlFEMessage::SASLInitialResponse(_) => {
265
337
                Some(PgsqlStateProgress::SASLInitialResponseReceived)
266
            }
267
1.86k
            PgsqlFEMessage::SASLResponse(_) => Some(PgsqlStateProgress::SASLResponseReceived),
268
            PgsqlFEMessage::SimpleQuery(_) => {
269
                SCLogDebug!("Match: SimpleQuery");
270
2.05k
                Some(PgsqlStateProgress::SimpleQueryReceived)
271
                // TODO here we may want to save the command that was received, to compare that later on when we receive command completed?
272
273
                // Important to keep in mind that: "In simple Query mode, the format of retrieved values is always text, except when the given command is a FETCH from a cursor declared with the BINARY option. In that case, the retrieved values are in binary format. The format codes given in the RowDescription message tell which format is being used." (from pgsql official documentation)
274
            }
275
416
            PgsqlFEMessage::CancelRequest(_) => Some(PgsqlStateProgress::CancelRequestReceived),
276
            PgsqlFEMessage::Terminate(_) => {
277
                SCLogDebug!("Match: Terminate message");
278
4.92M
                Some(PgsqlStateProgress::ConnectionTerminated)
279
            }
280
            PgsqlFEMessage::UnknownMessageType(_) => {
281
                SCLogDebug!("Match: Unknown message type");
282
                // Not changing state when we don't know the message
283
275k
                None
284
            }
285
        }
286
5.57M
    }
287
288
5.88M
    fn state_based_req_parsing(
289
5.88M
        state: PgsqlStateProgress, input: &[u8],
290
5.88M
    ) -> IResult<&[u8], parser::PgsqlFEMessage> {
291
5.88M
        match state {
292
            PgsqlStateProgress::SASLAuthenticationReceived => {
293
6.00k
                parser::parse_sasl_initial_response(input)
294
            }
295
            PgsqlStateProgress::SASLInitialResponseReceived
296
            | PgsqlStateProgress::SASLAuthenticationContinueReceived => {
297
4.65k
                parser::parse_sasl_response(input)
298
            }
299
            PgsqlStateProgress::SimpleAuthenticationReceived => {
300
3.37k
                parser::parse_password_message(input)
301
            }
302
5.87M
            _ => parser::parse_request(input),
303
        }
304
5.88M
    }
305
306
    /// Process State progress to decide if request is finished
307
    ///
308
5.29M
    fn request_is_complete(state: PgsqlStateProgress) -> bool {
309
5.29M
        match state {
310
            PgsqlStateProgress::SSLRequestReceived
311
            | PgsqlStateProgress::StartupMessageReceived
312
            | PgsqlStateProgress::SimpleQueryReceived
313
            | PgsqlStateProgress::PasswordMessageReceived
314
            | PgsqlStateProgress::SASLInitialResponseReceived
315
            | PgsqlStateProgress::SASLResponseReceived
316
            | PgsqlStateProgress::CancelRequestReceived
317
5.29M
            | PgsqlStateProgress::ConnectionTerminated => true,
318
0
            _ => false,
319
        }
320
5.29M
    }
321
322
338k
    fn parse_request(&mut self, flow: *const Flow, input: &[u8]) -> AppLayerResult {
323
        // We're not interested in empty requests.
324
338k
        if input.is_empty() {
325
0
            return AppLayerResult::ok();
326
338k
        }
327
328
        // If there was gap, check we can sync up again.
329
338k
        if self.request_gap {
330
0
            if parser::parse_request(input).is_ok() {
331
                // The parser now needs to decide what to do as we are not in sync.
332
                // For now, we'll just try again next time.
333
                SCLogDebug!("Suricata interprets there's a gap in the request");
334
0
                return AppLayerResult::ok();
335
0
            }
336
337
            // It looks like we're in sync with the message header
338
            // clear gap state and keep parsing.
339
0
            self.request_gap = false;
340
338k
        }
341
342
338k
        let mut start = input;
343
5.90M
        while !start.is_empty() {
344
            SCLogDebug!(
345
                "In 'parse_request' State Progress is: {:?}",
346
                &self.state_progress
347
            );
348
5.88M
            match PgsqlState::state_based_req_parsing(self.state_progress, start) {
349
5.57M
                Ok((rem, request)) => {
350
5.57M
                    start = rem;
351
5.57M
                    let new_state = PgsqlState::request_next_state(&request);
352
353
5.57M
                    if let Some(state) = new_state {
354
5.29M
                        self.state_progress = state;
355
5.29M
                    };
356
                    // PostreSQL progress states can be represented as a finite state machine
357
                    // After the connection phase, the backend/ server will be mostly waiting in a state of `ReadyForQuery`, unless
358
                    // it's processing some request.
359
                    // When the frontend wants to cancel a request, it will send a CancelRequest message over a new connection - to
360
                    // which there won't be any responses.
361
                    // If the frontend wants to terminate the connection, the backend won't send any confirmation after receiving a
362
                    // Terminate request.
363
                    // A simplified finite state machine for PostgreSQL v3 can be found at:
364
                    // https://samadhiweb.com/blog/2013.04.28.graphviz.postgresv3.html
365
5.57M
                    if let Some(tx) = self.find_or_create_tx() {
366
5.56M
                        tx.tx_data.updated_ts = true;
367
5.56M
                        tx.request = Some(request);
368
5.56M
                        if let Some(state) = new_state {
369
5.29M
                            if Self::request_is_complete(state) {
370
                                // The request is always complete at this point
371
5.29M
                                tx.tx_req_state = PgsqlTxProgress::TxDone;
372
5.29M
                                if state == PgsqlStateProgress::ConnectionTerminated
373
370k
                                    || state == PgsqlStateProgress::CancelRequestReceived
374
4.92M
                                {
375
4.92M
                                    /* The server won't send any responses to such requests, so transaction should be over */
376
4.92M
                                    tx.tx_res_state = PgsqlTxProgress::TxDone;
377
4.92M
                                }
378
5.29M
                                sc_app_layer_parser_trigger_raw_stream_reassembly(
379
5.29M
                                    flow,
380
5.29M
                                    Direction::ToServer as i32,
381
                                );
382
0
                            }
383
263k
                        }
384
                    } else {
385
                        // If there isn't a new transaction, we'll consider Suri should move on
386
12.6k
                        return AppLayerResult::ok();
387
                    };
388
                }
389
312k
                Err(Err::Incomplete(_needed)) => {
390
312k
                    let consumed = input.len() - start.len();
391
312k
                    let needed_estimation = start.len() + 1;
392
                    SCLogDebug!(
393
                        "Needed: {:?}, estimated needed: {:?}",
394
                        _needed,
395
                        needed_estimation
396
                    );
397
312k
                    return AppLayerResult::incomplete(consumed as u32, needed_estimation as u32);
398
                }
399
                Err(_) => {
400
1.11k
                    return AppLayerResult::err();
401
                }
402
            }
403
        }
404
405
        // Input was fully consumed.
406
12.2k
        return AppLayerResult::ok();
407
338k
    }
408
409
    /// When the state changes based on a specific response, there are other actions we may need to perform
410
    ///
411
    /// If there is data from the backend message that Suri should store separately in the State or
412
    /// Transaction, that is also done here
413
931k
    fn response_process_next_state(
414
931k
        &mut self, response: &PgsqlBEMessage, f: *const Flow,
415
931k
    ) -> Option<PgsqlStateProgress> {
416
1.29k
        match response {
417
            PgsqlBEMessage::SSLResponse(parser::SSLResponseMessage::SSLAccepted) => {
418
                SCLogDebug!("SSL Request accepted");
419
3
                unsafe {
420
3
                    AppLayerRequestProtocolTLSUpgrade(f);
421
3
                }
422
3
                Some(PgsqlStateProgress::Finished)
423
            }
424
            PgsqlBEMessage::SSLResponse(parser::SSLResponseMessage::SSLRejected) => {
425
                SCLogDebug!("SSL Request rejected");
426
1.29k
                Some(PgsqlStateProgress::SSLRejectedReceived)
427
            }
428
            PgsqlBEMessage::AuthenticationSASL(_) => {
429
1.21k
                Some(PgsqlStateProgress::SASLAuthenticationReceived)
430
            }
431
            PgsqlBEMessage::AuthenticationSASLContinue(_) => {
432
2.34k
                Some(PgsqlStateProgress::SASLAuthenticationContinueReceived)
433
            }
434
            PgsqlBEMessage::AuthenticationSASLFinal(_) => {
435
664
                Some(PgsqlStateProgress::SASLAuthenticationFinalReceived)
436
            }
437
            PgsqlBEMessage::AuthenticationOk(_) => {
438
563
                Some(PgsqlStateProgress::AuthenticationOkReceived)
439
            }
440
9.98k
            PgsqlBEMessage::ParameterStatus(_) => Some(PgsqlStateProgress::ParameterSetup),
441
            PgsqlBEMessage::BackendKeyData(_) => {
442
76.1k
                let backend_info = response.get_backendkey_info();
443
76.1k
                self.backend_pid = backend_info.0;
444
76.1k
                self.backend_secret_key = backend_info.1;
445
76.1k
                Some(PgsqlStateProgress::BackendKeyReceived)
446
            }
447
210k
            PgsqlBEMessage::ReadyForQuery(_) => Some(PgsqlStateProgress::ReadyForQueryReceived),
448
            // TODO should we store any Parameter Status in PgsqlState?
449
            PgsqlBEMessage::AuthenticationMD5Password(_)
450
            | PgsqlBEMessage::AuthenticationCleartextPassword(_) => {
451
1.41k
                Some(PgsqlStateProgress::SimpleAuthenticationReceived)
452
            }
453
2.26k
            PgsqlBEMessage::RowDescription(_) => Some(PgsqlStateProgress::RowDescriptionReceived),
454
179k
            PgsqlBEMessage::ConsolidatedDataRow(msg) => {
455
                // Increment tx.data_size here, since we know msg type, so that we can later on log that info
456
179k
                self.transactions.back_mut()?.sum_data_size(msg.data_size);
457
178k
                Some(PgsqlStateProgress::DataRowReceived)
458
            }
459
            PgsqlBEMessage::CommandComplete(_) => {
460
                // TODO Do we want to compare the command that was stored when
461
                // query was sent with what we received here?
462
267k
                Some(PgsqlStateProgress::CommandCompletedReceived)
463
            }
464
4.58k
            PgsqlBEMessage::ErrorResponse(_) => Some(PgsqlStateProgress::ErrorMessageReceived),
465
            _ => {
466
                // We don't always have to change current state when we see a response...
467
172k
                None
468
            }
469
        }
470
931k
    }
471
472
1.24M
    fn state_based_resp_parsing(
473
1.24M
        state: PgsqlStateProgress, input: &[u8],
474
1.24M
    ) -> IResult<&[u8], parser::PgsqlBEMessage> {
475
1.24M
        if state == PgsqlStateProgress::SSLRequestReceived {
476
1.29k
            parser::parse_ssl_response(input)
477
        } else {
478
1.24M
            parser::pgsql_parse_response(input)
479
        }
480
1.24M
    }
481
482
    /// Process State progress to decide if response is finished
483
    ///
484
296k
    fn response_is_complete(state: PgsqlStateProgress) -> bool {
485
296k
        match state {
486
            PgsqlStateProgress::ReadyForQueryReceived
487
            | PgsqlStateProgress::SSLRejectedReceived
488
            | PgsqlStateProgress::SimpleAuthenticationReceived
489
            | PgsqlStateProgress::SASLAuthenticationReceived
490
            | PgsqlStateProgress::SASLAuthenticationContinueReceived
491
            | PgsqlStateProgress::SASLAuthenticationFinalReceived
492
212k
            | PgsqlStateProgress::Finished => true,
493
83.6k
            _ => false,
494
        }
495
296k
    }
496
497
339k
    fn parse_response(&mut self, flow: *const Flow, input: &[u8]) -> AppLayerResult {
498
        // We're not interested in empty responses.
499
339k
        if input.is_empty() {
500
0
            return AppLayerResult::ok();
501
339k
        }
502
503
339k
        if self.response_gap {
504
0
            if !probe_tc(input) {
505
                // Out of sync, we'll just try again next time.
506
                SCLogDebug!("Suricata interprets there's a gap in the response");
507
0
                return AppLayerResult::ok();
508
0
            }
509
510
            // It seems we're in sync with a message header, clear gap state and keep parsing.
511
0
            self.response_gap = false;
512
339k
        }
513
514
339k
        let mut start = input;
515
1.25M
        while !start.is_empty() {
516
1.24M
            match PgsqlState::state_based_resp_parsing(self.state_progress, start) {
517
931k
                Ok((rem, response)) => {
518
931k
                    start = rem;
519
                    SCLogDebug!("Response is {:?}", &response);
520
931k
                    let new_state = self.response_process_next_state(&response, flow);
521
931k
                    if let Some(state) = new_state {
522
757k
                        self.state_progress = state;
523
757k
                    }
524
931k
                    if let Some(tx) = self.find_or_create_tx() {
525
914k
                        tx.tx_data.updated_tc = true;
526
914k
                        if tx.tx_res_state == PgsqlTxProgress::TxInit {
527
177k
                            tx.tx_res_state = PgsqlTxProgress::TxReceived;
528
737k
                        }
529
914k
                        if let Some(state) = new_state {
530
741k
                            if state == PgsqlStateProgress::DataRowReceived {
531
178k
                                tx.incr_row_cnt();
532
562k
                            } else if state == PgsqlStateProgress::CommandCompletedReceived
533
267k
                                && tx.get_row_cnt() > 0
534
266k
                            {
535
266k
                                // let's summarize the info from the data_rows in one response
536
266k
                                let dummy_resp = PgsqlBEMessage::ConsolidatedDataRow(
537
266k
                                    ConsolidatedDataRowPacket {
538
266k
                                        identifier: b'D',
539
266k
                                        row_cnt: tx.get_row_cnt(),
540
266k
                                        data_size: tx.data_size, // total byte count of all data_row messages combined
541
266k
                                    },
542
266k
                                );
543
266k
                                tx.responses.push(dummy_resp);
544
266k
                                tx.responses.push(response);
545
266k
                            } else {
546
296k
                                tx.responses.push(response);
547
296k
                                if Self::response_is_complete(state) {
548
212k
                                    tx.tx_req_state = PgsqlTxProgress::TxDone;
549
212k
                                    tx.tx_res_state = PgsqlTxProgress::TxDone;
550
212k
                                    sc_app_layer_parser_trigger_raw_stream_reassembly(
551
212k
                                        flow,
552
212k
                                        Direction::ToClient as i32,
553
212k
                                    );
554
212k
                                }
555
                            }
556
172k
                        }
557
                    } else {
558
                        // If there isn't a new transaction, we'll consider Suri should move on
559
17.1k
                        return AppLayerResult::ok();
560
                    };
561
                }
562
316k
                Err(Err::Incomplete(_needed)) => {
563
316k
                    let consumed = input.len() - start.len();
564
316k
                    let needed_estimation = start.len() + 1;
565
                    SCLogDebug!(
566
                        "Needed: {:?}, estimated needed: {:?}, start is {:?}",
567
                        _needed,
568
                        needed_estimation,
569
                        &start
570
                    );
571
316k
                    return AppLayerResult::incomplete(consumed as u32, needed_estimation as u32);
572
                }
573
                Err(_) => {
574
                    SCLogDebug!("Error while parsing PostgreSQL response");
575
575
                    return AppLayerResult::err();
576
                }
577
            }
578
        }
579
580
        // All input was fully consumed.
581
5.14k
        return AppLayerResult::ok();
582
339k
    }
583
584
0
    fn on_request_gap(&mut self, _size: u32) {
585
0
        self.request_gap = true;
586
0
    }
587
588
0
    fn on_response_gap(&mut self, _size: u32) {
589
0
        self.response_gap = true;
590
0
    }
591
}
592
593
/// Probe for a valid PostgreSQL response
594
///
595
/// Currently, for parser usage only. We have a bit more logic in the function
596
/// used by the engine.
597
/// PGSQL messages don't have a header per se, so we parse the slice for an ok()
598
0
fn probe_tc(input: &[u8]) -> bool {
599
0
    if parser::pgsql_parse_response(input).is_ok() || parser::parse_ssl_response(input).is_ok() {
600
0
        return true;
601
0
    }
602
    SCLogDebug!("probe_tc is false");
603
0
    false
604
0
}
605
606
3.37M
fn pgsql_tx_get_req_state(tx: *mut std::os::raw::c_void) -> PgsqlTxProgress {
607
    let tx_safe: &mut PgsqlTransaction;
608
3.37M
    unsafe {
609
3.37M
        tx_safe = cast_pointer!(tx, PgsqlTransaction);
610
3.37M
    }
611
3.37M
    tx_safe.tx_req_state
612
3.37M
}
613
614
60.0M
fn pgsql_tx_get_res_state(tx: *mut std::os::raw::c_void) -> PgsqlTxProgress {
615
    let tx_safe: &mut PgsqlTransaction;
616
60.0M
    unsafe {
617
60.0M
        tx_safe = cast_pointer!(tx, PgsqlTransaction);
618
60.0M
    }
619
60.0M
    tx_safe.tx_res_state
620
60.0M
}
621
622
// C exports.
623
624
/// C entry point for a probing parser.
625
#[no_mangle]
626
230k
pub unsafe extern "C" fn SCPgsqlProbingParserTS(
627
230k
    _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8,
628
230k
) -> AppProto {
629
230k
    if input_len >= 1 && !input.is_null() {
630
631
228k
        let slice: &[u8] = build_slice!(input, input_len as usize);
632
633
228k
        match parser::parse_request(slice) {
634
67.1k
            Ok((_, request)) => {
635
67.1k
                if let PgsqlFEMessage::UnknownMessageType(_) = request {
636
17
                    return ALPROTO_FAILED;
637
67.1k
                }
638
67.1k
                return ALPROTO_PGSQL;
639
            }
640
            Err(Err::Incomplete(_)) => {
641
161k
                return ALPROTO_UNKNOWN;
642
            }
643
392
            Err(_e) => {
644
392
                return ALPROTO_FAILED;
645
            }
646
        }
647
2.25k
    }
648
2.25k
    return ALPROTO_UNKNOWN;
649
230k
}
650
651
/// C entry point for a probing parser.
652
#[no_mangle]
653
195k
pub unsafe extern "C" fn SCPgsqlProbingParserTC(
654
195k
    _flow: *const Flow, _direction: u8, input: *const u8, input_len: u32, _rdir: *mut u8,
655
195k
) -> AppProto {
656
195k
    if input_len >= 1 && !input.is_null() {
657
658
194k
        let slice: &[u8] = build_slice!(input, input_len as usize);
659
660
194k
        if parser::parse_ssl_response(slice).is_ok() {
661
628
            return ALPROTO_PGSQL;
662
193k
        }
663
664
193k
        match parser::pgsql_parse_response(slice) {
665
48.9k
            Ok((_, response)) => {
666
48.9k
                if let PgsqlBEMessage::UnknownMessageType(_) = response {
667
28
                    return ALPROTO_FAILED;
668
48.9k
                }
669
48.9k
                return ALPROTO_PGSQL;
670
            }
671
            Err(Err::Incomplete(_)) => {
672
144k
                return ALPROTO_UNKNOWN;
673
            }
674
146
            Err(_e) => {
675
146
                return ALPROTO_FAILED;
676
            }
677
        }
678
886
    }
679
886
    return ALPROTO_UNKNOWN;
680
195k
}
681
682
#[no_mangle]
683
9.75k
pub extern "C" fn SCPgsqlStateNew(
684
9.75k
    _orig_state: *mut std::os::raw::c_void, _orig_proto: AppProto,
685
9.75k
) -> *mut std::os::raw::c_void {
686
9.75k
    let state = PgsqlState::new();
687
9.75k
    let boxed = Box::new(state);
688
9.75k
    return Box::into_raw(boxed) as *mut _;
689
9.75k
}
690
691
#[no_mangle]
692
9.75k
pub extern "C" fn SCPgsqlStateFree(state: *mut std::os::raw::c_void) {
693
    // Just unbox...
694
9.75k
    std::mem::drop(unsafe { Box::from_raw(state as *mut PgsqlState) });
695
9.75k
}
696
697
#[no_mangle]
698
3.37M
pub extern "C" fn SCPgsqlStateTxFree(state: *mut std::os::raw::c_void, tx_id: u64) {
699
    let state_safe: &mut PgsqlState;
700
3.37M
    unsafe {
701
3.37M
        state_safe = cast_pointer!(state, PgsqlState);
702
3.37M
    }
703
3.37M
    state_safe.free_tx(tx_id);
704
3.37M
}
705
706
#[no_mangle]
707
338k
pub unsafe extern "C" fn SCPgsqlParseRequest(
708
338k
    flow: *const Flow, state: *mut std::os::raw::c_void, pstate: *mut std::os::raw::c_void,
709
338k
    stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
710
338k
) -> AppLayerResult {
711
338k
    if stream_slice.is_empty() {
712
0
        if AppLayerParserStateIssetFlag(pstate, APP_LAYER_PARSER_EOF_TS) > 0 {
713
            SCLogDebug!(" Suricata reached `eof`");
714
0
            return AppLayerResult::ok();
715
        } else {
716
0
            return AppLayerResult::err();
717
        }
718
338k
    }
719
720
721
338k
    let state_safe: &mut PgsqlState = cast_pointer!(state, PgsqlState);
722
723
338k
    if stream_slice.is_gap() {
724
0
        state_safe.on_request_gap(stream_slice.gap_size());
725
338k
    } else if !stream_slice.is_empty() {
726
338k
        return state_safe.parse_request(flow, stream_slice.as_slice());
727
0
    }
728
0
    AppLayerResult::ok()
729
338k
}
730
731
#[no_mangle]
732
339k
pub unsafe extern "C" fn SCPgsqlParseResponse(
733
339k
    flow: *const Flow, state: *mut std::os::raw::c_void, pstate: *mut std::os::raw::c_void,
734
339k
    stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
735
339k
) -> AppLayerResult {
736
339k
    if stream_slice.is_empty() {
737
0
        if AppLayerParserStateIssetFlag(pstate, APP_LAYER_PARSER_EOF_TC) > 0 {
738
0
            return AppLayerResult::ok();
739
        } else {
740
0
            return AppLayerResult::err();
741
        }
742
339k
    }
743
744
339k
    let state_safe: &mut PgsqlState = cast_pointer!(state, PgsqlState);
745
746
339k
    if stream_slice.is_gap() {
747
0
        state_safe.on_response_gap(stream_slice.gap_size());
748
339k
    } else if !stream_slice.is_empty() {
749
339k
        return state_safe.parse_response(flow, stream_slice.as_slice());
750
0
    }
751
0
    AppLayerResult::ok()
752
339k
}
753
754
#[no_mangle]
755
0
pub unsafe extern "C" fn SCPgsqlStateGetTx(
756
0
    state: *mut std::os::raw::c_void, tx_id: u64,
757
0
) -> *mut std::os::raw::c_void {
758
0
    let state_safe: &mut PgsqlState = cast_pointer!(state, PgsqlState);
759
0
    match state_safe.get_tx(tx_id) {
760
0
        Some(tx) => {
761
0
            return tx as *const _ as *mut _;
762
        }
763
        None => {
764
0
            return std::ptr::null_mut();
765
        }
766
    }
767
0
}
768
769
#[no_mangle]
770
2.02M
pub extern "C" fn SCPgsqlStateGetTxCount(state: *mut std::os::raw::c_void) -> u64 {
771
    let state_safe: &mut PgsqlState;
772
2.02M
    unsafe {
773
2.02M
        state_safe = cast_pointer!(state, PgsqlState);
774
2.02M
    }
775
2.02M
    return state_safe.tx_id;
776
2.02M
}
777
778
#[no_mangle]
779
63.3M
pub unsafe extern "C" fn SCPgsqlTxGetALStateProgress(
780
63.3M
    tx: *mut std::os::raw::c_void, direction: u8,
781
63.3M
) -> std::os::raw::c_int {
782
63.3M
    if direction == Direction::ToServer as u8 {
783
3.37M
        return pgsql_tx_get_req_state(tx) as i32;
784
60.0M
    }
785
786
    // Direction has only two possible values, so we don't need to check for the other one
787
60.0M
    pgsql_tx_get_res_state(tx) as i32
788
63.3M
}
789
790
export_tx_data_get!(rs_pgsql_get_tx_data, PgsqlTransaction);
791
export_state_data_get!(rs_pgsql_get_state_data, PgsqlState);
792
793
// Parser name as a C style string.
794
const PARSER_NAME: &[u8] = b"pgsql\0";
795
796
#[no_mangle]
797
34
pub unsafe extern "C" fn SCRegisterPgsqlParser() {
798
34
    let default_port = CString::new("[5432]").unwrap();
799
34
    let mut stream_depth = PGSQL_CONFIG_DEFAULT_STREAM_DEPTH;
800
34
    let parser = RustParser {
801
34
        name: PARSER_NAME.as_ptr() as *const std::os::raw::c_char,
802
34
        default_port: default_port.as_ptr(),
803
34
        ipproto: IPPROTO_TCP,
804
34
        probe_ts: Some(SCPgsqlProbingParserTS),
805
34
        probe_tc: Some(SCPgsqlProbingParserTC),
806
34
        min_depth: 0,
807
34
        max_depth: 16,
808
34
        state_new: SCPgsqlStateNew,
809
34
        state_free: SCPgsqlStateFree,
810
34
        tx_free: SCPgsqlStateTxFree,
811
34
        parse_ts: SCPgsqlParseRequest,
812
34
        parse_tc: SCPgsqlParseResponse,
813
34
        get_tx_count: SCPgsqlStateGetTxCount,
814
34
        get_tx: SCPgsqlStateGetTx,
815
34
        tx_comp_st_ts: PgsqlTxProgress::TxDone as i32,
816
34
        tx_comp_st_tc: PgsqlTxProgress::TxDone as i32,
817
34
        tx_get_progress: SCPgsqlTxGetALStateProgress,
818
34
        get_eventinfo: None,
819
34
        get_eventinfo_byid: None,
820
34
        localstorage_new: None,
821
34
        localstorage_free: None,
822
34
        get_tx_files: None,
823
34
        get_tx_iterator: Some(
824
34
            crate::applayer::state_get_tx_iterator::<PgsqlState, PgsqlTransaction>,
825
34
        ),
826
34
        get_tx_data: rs_pgsql_get_tx_data,
827
34
        get_state_data: rs_pgsql_get_state_data,
828
34
        apply_tx_config: None,
829
34
        flags: APP_LAYER_PARSER_OPT_ACCEPT_GAPS,
830
34
        truncate: None,
831
34
        get_frame_id_by_name: None,
832
34
        get_frame_name_by_id: None,
833
34
    };
834
835
34
    let ip_proto_str = CString::new("tcp").unwrap();
836
837
34
    if AppLayerProtoDetectConfProtoDetectionEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
838
34
        let alproto = AppLayerRegisterProtocolDetection(&parser, 1);
839
34
        ALPROTO_PGSQL = alproto;
840
34
        if AppLayerParserConfParserEnabled(ip_proto_str.as_ptr(), parser.name) != 0 {
841
34
            let _ = AppLayerRegisterParser(&parser, alproto);
842
34
        }
843
        SCLogDebug!("Rust pgsql parser registered.");
844
34
        let retval = conf_get("app-layer.protocols.pgsql.stream-depth");
845
34
        if let Some(val) = retval {
846
0
            match get_memval(val) {
847
0
                Ok(retval) => {
848
0
                    stream_depth = retval as u32;
849
0
                }
850
                Err(_) => {
851
0
                    SCLogError!("Invalid depth value");
852
                }
853
            }
854
0
            AppLayerParserSetStreamDepth(IPPROTO_TCP, ALPROTO_PGSQL, stream_depth)
855
34
        }
856
34
        if let Some(val) = conf_get("app-layer.protocols.pgsql.max-tx") {
857
0
            if let Ok(v) = val.parse::<usize>() {
858
0
                PGSQL_MAX_TX = v;
859
0
            } else {
860
0
                SCLogError!("Invalid value for pgsql.max-tx");
861
            }
862
34
        }
863
0
    } else {
864
0
        SCLogDebug!("Protocol detector and parser disabled for PGSQL.");
865
0
    }
866
34
}
867
868
#[cfg(test)]
869
mod test {
870
    use super::*;
871
872
    #[test]
873
    fn test_response_probe() {
874
        /* Authentication Request MD5 password salt value f211a3ed */
875
        let buf: &[u8] = &[
876
            0x52, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x05, 0xf2, 0x11, 0xa3, 0xed,
877
        ];
878
        assert!(probe_tc(buf));
879
880
        /* R  8 -- Authentication Cleartext */
881
        let buf: &[u8] = &[0x52, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x03];
882
        assert!(probe_tc(buf));
883
884
        let buf: &[u8] = &[
885
            /* R */ 0x52, /* 54 */ 0x00, 0x00, 0x00, 0x36, /* 12 */ 0x00, 0x00,
886
            0x00, 0x0c, /* signature */ 0x76, 0x3d, 0x64, 0x31, 0x50, 0x58, 0x61, 0x38, 0x54,
887
            0x4b, 0x46, 0x50, 0x5a, 0x72, 0x52, 0x33, 0x4d, 0x42, 0x52, 0x6a, 0x4c, 0x79, 0x33,
888
            0x2b, 0x4a, 0x36, 0x79, 0x78, 0x72, 0x66, 0x77, 0x2f, 0x7a, 0x7a, 0x70, 0x38, 0x59,
889
            0x54, 0x39, 0x65, 0x78, 0x56, 0x37, 0x73, 0x38, 0x3d,
890
        ];
891
        assert!(probe_tc(buf));
892
893
        /* S   26 -- parameter status application_name psql*/
894
        let buf: &[u8] = &[
895
            0x53, 0x00, 0x00, 0x00, 0x1a, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69,
896
            0x6f, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x00, 0x70, 0x73, 0x71, 0x6c, 0x00,
897
        ];
898
        assert!(probe_tc(buf));
899
    }
900
901
    #[test]
902
    fn test_request_events() {
903
        let mut state = PgsqlState::new();
904
        // an SSL Request
905
        let buf: &[u8] = &[0x00, 0x00, 0x00, 0x08, 0x04, 0xd2, 0x16, 0x2f];
906
        state.parse_request(std::ptr::null_mut(), buf);
907
        let ok_state = PgsqlStateProgress::SSLRequestReceived;
908
909
        assert_eq!(state.state_progress, ok_state);
910
911
        // TODO add test for startup request
912
    }
913
914
    #[test]
915
    fn test_incomplete_request() {
916
        let mut state = PgsqlState::new();
917
        // An SSL Request
918
        let buf: &[u8] = &[0x00, 0x00, 0x00, 0x08, 0x04, 0xd2, 0x16, 0x2f];
919
920
        let r = state.parse_request(std::ptr::null_mut(), &buf[0..0]);
921
        assert_eq!(
922
            r,
923
            AppLayerResult {
924
                status: 0,
925
                consumed: 0,
926
                needed: 0
927
            }
928
        );
929
930
        let r = state.parse_request(std::ptr::null_mut(), &buf[0..1]);
931
        assert_eq!(
932
            r,
933
            AppLayerResult {
934
                status: 1,
935
                consumed: 0,
936
                needed: 2
937
            }
938
        );
939
940
        let r = state.parse_request(std::ptr::null_mut(), &buf[0..2]);
941
        assert_eq!(
942
            r,
943
            AppLayerResult {
944
                status: 1,
945
                consumed: 0,
946
                needed: 3
947
            }
948
        );
949
    }
950
951
    #[test]
952
    fn test_find_or_create_tx() {
953
        let mut state = PgsqlState::new();
954
        state.state_progress = PgsqlStateProgress::UnknownState;
955
        let tx = state.find_or_create_tx();
956
        assert!(tx.is_none());
957
958
        state.state_progress = PgsqlStateProgress::IdleState;
959
        let tx = state.find_or_create_tx();
960
        assert!(tx.is_some());
961
962
        // Now, even though there isn't a new transaction created, the previous one is available
963
        state.state_progress = PgsqlStateProgress::SSLRejectedReceived;
964
        let tx = state.find_or_create_tx();
965
        assert!(tx.is_some());
966
        assert_eq!(tx.unwrap().tx_id, 1);
967
    }
968
969
    #[test]
970
    fn test_row_cnt() {
971
        let mut tx = PgsqlTransaction::new();
972
        assert_eq!(tx.get_row_cnt(), 0);
973
974
        tx.incr_row_cnt();
975
        assert_eq!(tx.get_row_cnt(), 1);
976
    }
977
}