Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/msrpce/rpcclient.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

221 statements  

1# SPDX-License-Identifier: GPL-2.0-or-later 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7DCE/RPC client as per [MS-RPCE] 

8""" 

9 

10import uuid 

11import socket 

12 

13from scapy.config import conf 

14 

15from scapy.layers.dcerpc import ( 

16 DceRpc5, 

17 DceRpc5AlterContext, 

18 DceRpc5AlterContextResp, 

19 DceRpc5Auth3, 

20 DceRpc5Bind, 

21 DceRpc5BindAck, 

22 DceRpc5BindNak, 

23 DceRpc5Context, 

24 DceRpc5Fault, 

25 DceRpc5Request, 

26 DceRpc5Response, 

27 DceRpc5AbstractSyntax, 

28 DceRpc5TransferSyntax, 

29 DceRpcSocket, 

30 DCERPC_Transport, 

31 find_dcerpc_interface, 

32 CommonAuthVerifier, 

33 DCE_C_AUTHN_LEVEL, 

34 # NDR 

35 NDRPointer, 

36 NDRContextHandle, 

37) 

38from scapy.layers.gssapi import ( 

39 SSP, 

40 GSS_S_FAILURE, 

41 GSS_S_COMPLETE, 

42 GSS_S_CONTINUE_NEEDED, 

43 GSS_C_FLAGS, 

44) 

45from scapy.layers.smb2 import STATUS_ERREF 

46from scapy.layers.smbclient import ( 

47 SMB_RPC_SOCKET, 

48) 

49 

50# RPC 

51from scapy.layers.msrpce.ept import ( 

52 ept_map_Request, 

53 ept_map_Response, 

54 twr_p_t, 

55 protocol_tower_t, 

56 prot_and_addr_t, 

57 UUID, 

58) 

59 

60 

61class DCERPC_Client(object): 

62 """ 

63 A basic DCE/RPC client 

64 

65 :param ndr64: Should ask for NDR64 when binding (default False) 

66 """ 

67 

68 def __init__(self, transport, ndr64=False, ndrendian="little", verb=True, **kwargs): 

69 self.sock = None 

70 self.transport = transport 

71 assert isinstance( 

72 transport, DCERPC_Transport 

73 ), "transport must be from DCERPC_Transport" 

74 self.call_id = 0 

75 self.cont_id = 0 

76 self.ndr64 = ndr64 

77 self.ndrendian = ndrendian 

78 self.verb = verb 

79 self.auth_level = kwargs.pop("auth_level", DCE_C_AUTHN_LEVEL.NONE) 

80 self.auth_context_id = kwargs.pop("auth_context_id", 0) 

81 self.ssp = kwargs.pop("ssp", None) # type: SSP 

82 self.sspcontext = None 

83 self.dcesockargs = kwargs 

84 self.dcesockargs["transport"] = self.transport 

85 

86 @classmethod 

87 def from_smblink(cls, smbcli, smb_kwargs={}, **kwargs): 

88 """ 

89 Build a DCERPC_Client from a SMB_Client.smblink directly 

90 """ 

91 client = DCERPC_Client(DCERPC_Transport.NCACN_NP, **kwargs) 

92 sock = client.smbrpcsock = SMB_RPC_SOCKET(smbcli, **smb_kwargs) 

93 client.sock = DceRpcSocket( 

94 sock, 

95 DceRpc5, 

96 ssp=client.ssp, 

97 auth_level=client.auth_level, 

98 auth_context_id=client.auth_context_id, 

99 **client.dcesockargs, 

100 ) 

101 return client 

102 

103 def connect(self, ip, port=None, timeout=5, smb_kwargs={}): 

104 """ 

105 Initiate a connection 

106 """ 

107 if port is None: 

108 if self.transport == DCERPC_Transport.NCACN_IP_TCP: # IP/TCP 

109 port = 135 

110 elif self.transport == DCERPC_Transport.NCACN_NP: # SMB 

111 port = 445 

112 else: 

113 raise ValueError( 

114 "Can't guess the port for transport: %s" % self.transport 

115 ) 

116 sock = socket.socket() 

117 sock.settimeout(timeout) 

118 if self.verb: 

119 print( 

120 "\u2503 Connecting to %s on port %s via %s..." 

121 % (ip, port, repr(self.transport)) 

122 ) 

123 sock.connect((ip, port)) 

124 if self.verb: 

125 print( 

126 conf.color_theme.green( 

127 "\u2514 Connected from %s" % repr(sock.getsockname()) 

128 ) 

129 ) 

130 if self.transport == DCERPC_Transport.NCACN_NP: # SMB 

131 # We pack the socket into a SMB_RPC_SOCKET 

132 sock = self.smbrpcsock = SMB_RPC_SOCKET.from_tcpsock( 

133 sock, ssp=self.ssp, **smb_kwargs 

134 ) 

135 self.sock = DceRpcSocket(sock, DceRpc5, **self.dcesockargs) 

136 elif self.transport == DCERPC_Transport.NCACN_IP_TCP: 

137 self.sock = DceRpcSocket( 

138 sock, 

139 DceRpc5, 

140 ssp=self.ssp, 

141 auth_level=self.auth_level, 

142 auth_context_id=self.auth_context_id, 

143 **self.dcesockargs, 

144 ) 

145 

146 def close(self): 

147 if self.verb: 

148 print("X Connection closed\n") 

149 self.sock.close() 

150 

151 def sr1(self, pkt, **kwargs): 

152 self.call_id += 1 

153 pkt = ( 

154 DceRpc5( 

155 call_id=self.call_id, 

156 pfc_flags="PFC_FIRST_FRAG+PFC_LAST_FRAG", 

157 endian=self.ndrendian, 

158 auth_verifier=kwargs.pop("auth_verifier", None), 

159 ) 

160 / pkt 

161 ) 

162 if "pfc_flags" in kwargs: 

163 pkt.pfc_flags = kwargs.pop("pfc_flags") 

164 return self.sock.sr1(pkt, verbose=0, **kwargs) 

165 

166 def send(self, pkt, **kwargs): 

167 self.call_id += 1 

168 pkt = ( 

169 DceRpc5( 

170 call_id=self.call_id, 

171 pfc_flags="PFC_FIRST_FRAG+PFC_LAST_FRAG", 

172 endian=self.ndrendian, 

173 auth_verifier=kwargs.pop("auth_verifier", None), 

174 ) 

175 / pkt 

176 ) 

177 if "pfc_flags" in kwargs: 

178 pkt.pfc_flags = kwargs.pop("pfc_flags") 

179 return self.sock.send(pkt, **kwargs) 

180 

181 def sr1_req(self, pkt, **kwargs): 

182 if self.verb: 

183 print(conf.color_theme.opening(">> REQUEST: %s" % pkt.__class__.__name__)) 

184 # Send/receive 

185 resp = self.sr1( 

186 DceRpc5Request(cont_id=self.cont_id, alloc_hint=len(pkt)) / pkt, 

187 **kwargs, 

188 ) 

189 if DceRpc5Response in resp: 

190 if self.verb: 

191 print( 

192 conf.color_theme.success( 

193 "<< RESPONSE: %s" 

194 % (resp[DceRpc5Response].payload.__class__.__name__) 

195 ) 

196 ) 

197 return resp[DceRpc5Response].payload 

198 else: 

199 if self.verb: 

200 if DceRpc5Fault in resp: 

201 if resp[DceRpc5Fault].payload and not isinstance( 

202 resp[DceRpc5Fault].payload, conf.raw_layer 

203 ): 

204 resp[DceRpc5Fault].payload.show() 

205 if resp.status == 0x00000005: 

206 print(conf.color_theme.fail("! nca_s_fault_access_denied")) 

207 elif resp.status == 0x00000721: 

208 print( 

209 conf.color_theme.fail( 

210 "! nca_s_fault_sec_pkg_error " 

211 "(error in checksum/encryption)" 

212 ) 

213 ) 

214 else: 

215 print( 

216 conf.color_theme.fail( 

217 "! %s" % STATUS_ERREF.get(resp.status, "Failure") 

218 ) 

219 ) 

220 resp.show() 

221 return 

222 return resp 

223 

224 def get_bind_context(self, interface): 

225 return [ 

226 DceRpc5Context( 

227 cont_id=0, 

228 abstract_syntax=DceRpc5AbstractSyntax( 

229 if_uuid=interface.uuid, 

230 if_version=interface.if_version, 

231 ), 

232 transfer_syntaxes=[ 

233 DceRpc5TransferSyntax( 

234 # NDR 2.0 32-bit 

235 if_uuid="NDR 2.0", 

236 if_version=2, 

237 ) 

238 ], 

239 ), 

240 ] + ( 

241 [ 

242 DceRpc5Context( 

243 cont_id=1, 

244 abstract_syntax=DceRpc5AbstractSyntax( 

245 if_uuid=interface.uuid, 

246 if_version=interface.if_version, 

247 ), 

248 transfer_syntaxes=[ 

249 DceRpc5TransferSyntax( 

250 # NDR64 

251 if_uuid="NDR64", 

252 if_version=1, 

253 ) 

254 ], 

255 ), 

256 DceRpc5Context( 

257 cont_id=2, 

258 abstract_syntax=DceRpc5AbstractSyntax( 

259 if_uuid=interface.uuid, 

260 if_version=interface.if_version, 

261 ), 

262 transfer_syntaxes=[ 

263 DceRpc5TransferSyntax( 

264 if_uuid=uuid.UUID("6cb71c2c-9812-4540-0300-000000000000"), 

265 if_version=1, 

266 ) 

267 ], 

268 ), 

269 ] 

270 if self.ndr64 

271 else [] 

272 ) 

273 

274 def _bind(self, interface, reqcls, respcls): 

275 # Build a security context: [MS-RPCE] 3.3.1.5.2 

276 if self.verb: 

277 print( 

278 conf.color_theme.opening( 

279 ">> %s on %s" % (reqcls.__name__, interface) 

280 + (" (with %s)" % self.ssp.__class__.__name__ if self.ssp else "") 

281 ) 

282 ) 

283 if not self.ssp or ( 

284 self.transport == DCERPC_Transport.NCACN_NP 

285 and self.auth_level < DCE_C_AUTHN_LEVEL.PKT_INTEGRITY 

286 ): 

287 # NCACN_NP = SMB without INTEGRITY/PRIVACY does not bind the RPC securely, 

288 # again as it has already authenticated during the SMB Session Setup 

289 resp = self.sr1( 

290 reqcls(context_elem=self.get_bind_context(interface)), 

291 auth_verifier=None, 

292 ) 

293 status = GSS_S_COMPLETE 

294 else: 

295 # Perform authentication 

296 self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( 

297 self.sspcontext, 

298 req_flags=( 

299 # SSPs need to be instantiated with some special flags 

300 # for DCE/RPC usages. 

301 GSS_C_FLAGS.GSS_C_DCE_STYLE 

302 | GSS_C_FLAGS.GSS_C_REPLAY_FLAG 

303 | GSS_C_FLAGS.GSS_C_SEQUENCE_FLAG 

304 | GSS_C_FLAGS.GSS_C_MUTUAL_FLAG 

305 | ( 

306 GSS_C_FLAGS.GSS_C_INTEG_FLAG 

307 if self.auth_level >= DCE_C_AUTHN_LEVEL.PKT_INTEGRITY 

308 else 0 

309 ) 

310 | ( 

311 GSS_C_FLAGS.GSS_C_CONF_FLAG 

312 if self.auth_level >= DCE_C_AUTHN_LEVEL.PKT_PRIVACY 

313 else 0 

314 ) 

315 ), 

316 ) 

317 if status not in [GSS_S_CONTINUE_NEEDED, GSS_S_COMPLETE]: 

318 # Authentication failed. 

319 self.sspcontext.clifailure() 

320 return False 

321 resp = self.sr1( 

322 reqcls(context_elem=self.get_bind_context(interface)), 

323 auth_verifier=( 

324 None 

325 if not self.sspcontext 

326 else CommonAuthVerifier( 

327 auth_type=self.ssp.auth_type, 

328 auth_level=self.auth_level, 

329 auth_context_id=self.auth_context_id, 

330 auth_value=token, 

331 ) 

332 ), 

333 pfc_flags=( 

334 "PFC_FIRST_FRAG+PFC_LAST_FRAG" 

335 + ( 

336 # If the SSP supports "Header Signing", advertise it 

337 "+PFC_SUPPORT_HEADER_SIGN" 

338 if self.ssp is not None 

339 and self.sock.session.support_header_signing 

340 else "" 

341 ) 

342 ), 

343 ) 

344 if respcls not in resp: 

345 token = None 

346 status = GSS_S_FAILURE 

347 else: 

348 # Call the underlying SSP 

349 self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( 

350 self.sspcontext, 

351 token=resp.auth_verifier.auth_value, 

352 ) 

353 if status in [GSS_S_CONTINUE_NEEDED, GSS_S_COMPLETE]: 

354 # Authentication should continue, in two ways: 

355 # - through DceRpc5Auth3 (e.g. NTLM) 

356 # - through DceRpc5AlterContext (e.g. Kerberos) 

357 if token and self.ssp.LegsAmount(self.sspcontext) % 2 == 1: 

358 # AUTH 3 for certain SSPs (e.g. NTLM) 

359 # "The server MUST NOT respond to an rpc_auth_3 PDU" 

360 self.send( 

361 DceRpc5Auth3(), 

362 auth_verifier=CommonAuthVerifier( 

363 auth_type=self.ssp.auth_type, 

364 auth_level=self.auth_level, 

365 auth_context_id=self.auth_context_id, 

366 auth_value=token, 

367 ), 

368 ) 

369 status = GSS_S_COMPLETE 

370 else: 

371 while token: 

372 respcls = DceRpc5AlterContextResp 

373 resp = self.sr1( 

374 DceRpc5AlterContext( 

375 context_elem=self.get_bind_context(interface) 

376 ), 

377 auth_verifier=CommonAuthVerifier( 

378 auth_type=self.ssp.auth_type, 

379 auth_level=self.auth_level, 

380 auth_context_id=self.auth_context_id, 

381 auth_value=token, 

382 ), 

383 ) 

384 if respcls not in resp: 

385 status = GSS_S_FAILURE 

386 break 

387 if resp.auth_verifier is None: 

388 status = GSS_S_COMPLETE 

389 break 

390 self.sspcontext, token, status = self.ssp.GSS_Init_sec_context( 

391 self.sspcontext, 

392 token=resp.auth_verifier.auth_value, 

393 ) 

394 # Check context acceptance 

395 if ( 

396 status == GSS_S_COMPLETE 

397 and respcls in resp 

398 and any(x.result == 0 for x in resp.results[: int(self.ndr64) + 1]) 

399 ): 

400 self.call_id = 0 # reset call id 

401 port = resp.sec_addr.port_spec.decode() 

402 ndr = self.sock.session.ndr64 and "NDR64" or "NDR32" 

403 self.cont_id = int(self.sock.session.ndr64) # ctx 0 for NDR32, 1 for NDR64 

404 if self.verb: 

405 print( 

406 conf.color_theme.success( 

407 f"<< {respcls.__name__} port '{port}' using {ndr}" 

408 ) 

409 ) 

410 self.sock.session.sspcontext = self.sspcontext 

411 return True 

412 else: 

413 if self.verb: 

414 if DceRpc5BindNak in resp: 

415 err_msg = resp.sprintf( 

416 "reject_reason: %DceRpc5BindNak.provider_reject_reason%" 

417 ) 

418 print(conf.color_theme.fail("! Bind_nak (%s)" % err_msg)) 

419 if DceRpc5BindNak in resp: 

420 if resp[DceRpc5BindNak].payload and not isinstance( 

421 resp[DceRpc5BindNak].payload, conf.raw_layer 

422 ): 

423 resp[DceRpc5BindNak].payload.show() 

424 elif DceRpc5Fault in resp: 

425 if resp.status == 0x00000005: 

426 print(conf.color_theme.fail("! nca_s_fault_access_denied")) 

427 elif resp.status == 0x00000721: 

428 print( 

429 conf.color_theme.fail( 

430 "! nca_s_fault_sec_pkg_error " 

431 "(error in checksum/encryption)" 

432 ) 

433 ) 

434 else: 

435 print( 

436 conf.color_theme.fail( 

437 "! %s" % STATUS_ERREF.get(resp.status, "Failure") 

438 ) 

439 ) 

440 resp.show() 

441 if DceRpc5Fault in resp: 

442 if resp[DceRpc5Fault].payload and not isinstance( 

443 resp[DceRpc5Fault].payload, conf.raw_layer 

444 ): 

445 resp[DceRpc5Fault].payload.show() 

446 else: 

447 print(conf.color_theme.fail("! Failure")) 

448 resp.show() 

449 return False 

450 

451 def bind(self, interface): 

452 """ 

453 Bind the client to an interface 

454 """ 

455 return self._bind(interface, DceRpc5Bind, DceRpc5BindAck) 

456 

457 def alter_context(self, interface): 

458 """ 

459 Alter context: post-bind context negotiation 

460 """ 

461 return self._bind(interface, DceRpc5AlterContext, DceRpc5AlterContextResp) 

462 

463 def bind_or_alter(self, interface): 

464 """ 

465 Bind the client to an interface or alter the context if already bound 

466 """ 

467 if not self.sock.session.rpc_bind_interface: 

468 # No interface is bound 

469 self.bind(interface) 

470 else: 

471 # An interface is already bound 

472 self.alter_context(interface) 

473 

474 def open_smbpipe(self, name): 

475 """ 

476 Open a certain filehandle with the SMB automaton 

477 """ 

478 self.ipc_tid = self.smbrpcsock.tree_connect("IPC$") 

479 self.smbrpcsock.open_pipe(name) 

480 

481 def close_smbpipe(self): 

482 """ 

483 Close the previously opened pipe 

484 """ 

485 self.smbrpcsock.set_TID(self.ipc_tid) 

486 self.smbrpcsock.close_pipe() 

487 self.smbrpcsock.tree_disconnect() 

488 

489 def connect_and_bind( 

490 self, 

491 ip, 

492 interface, 

493 port=None, 

494 timeout=5, 

495 smb_kwargs={}, 

496 ): 

497 """ 

498 Asks the Endpoint Mapper what address to use to connect to the interface, 

499 then uses connect() followed by a bind() 

500 """ 

501 if self.transport == DCERPC_Transport.NCACN_IP_TCP: 

502 # IP/TCP 

503 # 1. ask the endpoint mapper (port 135) for the IP:PORT 

504 endpoints = get_endpoint( 

505 ip, 

506 interface, 

507 ndrendian=self.ndrendian, 

508 verb=self.verb, 

509 ) 

510 if endpoints: 

511 ip, port = endpoints[0] 

512 else: 

513 return 

514 # 2. Connect to that IP:PORT 

515 self.connect(ip, port=port) 

516 elif self.transport == DCERPC_Transport.NCACN_NP: 

517 # SMB 

518 # 1. ask the endpoint mapper (over SMB) for the namedpipe 

519 endpoints = get_endpoint( 

520 ip, 

521 interface, 

522 transport=self.transport, 

523 ndrendian=self.ndrendian, 

524 verb=self.verb, 

525 smb_kwargs=smb_kwargs, 

526 ) 

527 if endpoints: 

528 pipename = endpoints[0].lstrip("\\pipe\\") 

529 else: 

530 return 

531 # 2. connect to the SMB server 

532 self.connect(ip, port=port, timeout=timeout, smb_kwargs=smb_kwargs) 

533 # 3. open the new named pipe 

534 self.open_smbpipe(pipename) 

535 # Bind in RPC 

536 self.bind(interface) 

537 

538 def epm_map(self, interface): 

539 """ 

540 Calls ept_map (the EndPoint Manager) 

541 """ 

542 if self.ndr64: 

543 ndr_uuid = "NDR64" 

544 ndr_version = 1 

545 else: 

546 ndr_uuid = "NDR 2.0" 

547 ndr_version = 2 

548 pkt = self.sr1_req( 

549 ept_map_Request( 

550 obj=NDRPointer( 

551 referent_id=1, 

552 value=UUID( 

553 Data1=0, 

554 Data2=0, 

555 Data3=0, 

556 Data4=None, 

557 ), 

558 ), 

559 map_tower=NDRPointer( 

560 referent_id=2, 

561 value=twr_p_t( 

562 tower_octet_string=bytes( 

563 protocol_tower_t( 

564 floors=[ 

565 prot_and_addr_t( 

566 lhs_length=19, 

567 protocol_identifier=0xD, 

568 uuid=interface.uuid, 

569 version=interface.major_version, 

570 rhs_length=2, 

571 rhs=interface.minor_version, 

572 ), 

573 prot_and_addr_t( 

574 lhs_length=19, 

575 protocol_identifier=0xD, 

576 uuid=ndr_uuid, 

577 version=ndr_version, 

578 rhs_length=2, 

579 rhs=0, 

580 ), 

581 prot_and_addr_t( 

582 lhs_length=1, 

583 protocol_identifier="RPC connection-oriented protocol", # noqa: E501 

584 rhs_length=2, 

585 rhs=0, 

586 ), 

587 { 

588 DCERPC_Transport.NCACN_IP_TCP: ( 

589 prot_and_addr_t( 

590 lhs_length=1, 

591 protocol_identifier="NCACN_IP_TCP", 

592 rhs_length=2, 

593 rhs=135, 

594 ) 

595 ), 

596 DCERPC_Transport.NCACN_NP: ( 

597 prot_and_addr_t( 

598 lhs_length=1, 

599 protocol_identifier="NCACN_NP", 

600 rhs_length=2, 

601 rhs=b"0\x00", 

602 ) 

603 ), 

604 }[self.transport], 

605 { 

606 DCERPC_Transport.NCACN_IP_TCP: ( 

607 prot_and_addr_t( 

608 lhs_length=1, 

609 protocol_identifier="IP", 

610 rhs_length=4, 

611 rhs="0.0.0.0", 

612 ) 

613 ), 

614 DCERPC_Transport.NCACN_NP: ( 

615 prot_and_addr_t( 

616 lhs_length=1, 

617 protocol_identifier="NCACN_NB", 

618 rhs_length=10, 

619 rhs=b"127.0.0.1\x00", 

620 ) 

621 ), 

622 }[self.transport], 

623 ], 

624 ) 

625 ), 

626 ), 

627 ), 

628 entry_handle=NDRContextHandle( 

629 attributes=0, 

630 uuid=b"\x00" * 16, 

631 ), 

632 max_towers=500, 

633 ndr64=self.ndr64, 

634 ndrendian=self.ndrendian, 

635 ) 

636 ) 

637 if pkt and ept_map_Response in pkt: 

638 status = pkt[ept_map_Response].status 

639 # [MS-RPCE] sect 2.2.1.2.5 

640 if status == 0x00000000: 

641 towers = [ 

642 protocol_tower_t(x.value.tower_octet_string) 

643 for x in pkt[ept_map_Response].ITowers.value[0].value 

644 ] 

645 # Let's do some checks to know we know what we're doing 

646 endpoints = [] 

647 for t in towers: 

648 if t.floors[0].uuid != interface.uuid: 

649 if self.verb: 

650 print( 

651 conf.color_theme.fail( 

652 "! Server answered with a different interface." 

653 ) 

654 ) 

655 raise ValueError 

656 if t.floors[1].sprintf("%uuid%") != ndr_uuid: 

657 if self.verb: 

658 print( 

659 conf.color_theme.fail( 

660 "! Server answered with a different NDR version." 

661 ) 

662 ) 

663 raise ValueError 

664 if self.transport == DCERPC_Transport.NCACN_IP_TCP: 

665 endpoints.append((t.floors[4].rhs, t.floors[3].rhs)) 

666 elif self.transport == DCERPC_Transport.NCACN_NP: 

667 endpoints.append(t.floors[3].rhs.rstrip(b"\x00").decode()) 

668 return endpoints 

669 elif status == 0x16C9A0D6: 

670 if self.verb: 

671 pkt.show() 

672 print( 

673 conf.color_theme.fail( 

674 "! Server errored: 'There are no elements that satisfy" 

675 " the specified search criteria'." 

676 ) 

677 ) 

678 raise ValueError 

679 print(conf.color_theme.fail("! Failure.")) 

680 if pkt: 

681 pkt.show() 

682 raise ValueError("EPM Map failed") 

683 

684 

685def get_endpoint( 

686 ip, 

687 interface, 

688 transport=DCERPC_Transport.NCACN_IP_TCP, 

689 ndrendian="little", 

690 verb=True, 

691 smb_kwargs={}, 

692): 

693 """ 

694 Call the endpoint mapper on a remote IP to find an interface 

695 

696 :param ip: 

697 :param interface: 

698 :param mode: 

699 :param verb: 

700 

701 :return: a list of connection tuples for this interface 

702 """ 

703 client = DCERPC_Client( 

704 transport, 

705 ndr64=False, 

706 ndrendian=ndrendian, 

707 verb=verb, 

708 ) # EPM only works with NDR32 

709 client.connect(ip, smb_kwargs=smb_kwargs) 

710 if transport == DCERPC_Transport.NCACN_NP: # SMB 

711 client.open_smbpipe("epmapper") 

712 client.bind(find_dcerpc_interface("ept")) 

713 endpoints = client.epm_map(interface) 

714 client.close() 

715 return endpoints