diff --git a/examples/GetUserSPNs.py b/examples/GetUserSPNs.py index c8584b9814..a6f93b0012 100755 --- a/examples/GetUserSPNs.py +++ b/examples/GetUserSPNs.py @@ -51,6 +51,8 @@ from impacket.smbconnection import SMBConnection, SessionError from impacket.ntlm import compute_lmhash, compute_nthash +import ldap3 + class GetUserSPNs: @staticmethod @@ -86,10 +88,12 @@ def __init__(self, username, password, user_domain, target_domain, cmdLineOption self.__requestTGS = cmdLineOptions.request # [!] in this script the value of -dc-ip option is self.__kdcIP and the value of -dc-host option is self.__kdcHost self.__kdcIP = cmdLineOptions.dc_ip + self.__root = '' self.__kdcHost = cmdLineOptions.dc_host self.__saveTGS = cmdLineOptions.save self.__requestUser = cmdLineOptions.request_user self.__stealth = cmdLineOptions.stealth + self.__ldap_channel_binding = cmdLineOptions.ldap_channel_binding if cmdLineOptions.hashes is not None: self.__lmhash, self.__nthash = cmdLineOptions.hashes.split(':') @@ -248,6 +252,74 @@ def outputTGS(self, ticket, oldSessionKey, sessionKey, username, spn, fd=None): except Exception as e: logging.error(str(e)) + def get_ldap_connection(self, protocol='ldap', valid=False): + if self.__doKerberos is True: + try: + ldapConnection = ldap.LDAPConnection('%s://%s' % (protocol, self.__target), self.baseDN, self.__kdcIP) + ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash, + self.__nthash, + self.__aesKey, kdcHost=self.__kdcIP) + return ldapConnection + except ldap.LDAPSessionError as e: + if str(e).find('strongerAuthRequired') >= 0 and protocol != 'ldaps': + # We need to try SSL + return self.get_ldap_connection(protocol='ldaps') + else: + if self.__kdcIP is not None and self.__kdcHost is not None: + logging.critical("If the credentials are valid, check the hostname and IP address of KDC. They " + "must match exactly each other") + raise + + ldapUser = '%s\\%s' % (self.__targetDomain, self.__username) + if self.__ldap_channel_binding: + if not hasattr(ldap3, 'TLS_CHANNEL_BINDING'): + raise Exception("To use LDAP channel binding, install the patched ldap3 module: pip3 install git+https://github.com/ly4k/ldap3") + import ssl + version=ssl.PROTOCOL_TLSv1_2 + tls = ldap3.Tls(validate=ssl.CERT_NONE, version=version, ciphers='ALL:@SECLEVEL=0') + server = ldap3.Server('ldaps://%s' % self.__target, use_ssl=True, get_info=ldap3.ALL, tls=tls) + else: + server = ldap3.Server('%s://%s' % (protocol, self.__target), get_info=ldap3.ALL) + + if self.__ldap_channel_binding: + logging.debug('Authenticating to LDAP server with LDAPS + channel binding') + channel_binding = {"channel_binding": ldap3.TLS_CHANNEL_BINDING} + connection = ldap3.Connection(server,user=ldapUser,password=self.__password, + authentication=ldap3.NTLM,auto_referrals=False, + **channel_binding) + elif protocol == 'ldaps': + logging.debug('Authenticating to LDAP server with LDAPS') + connection = ldap3.Connection(server,user=ldapUser,password=self.__password, + authentication=ldap3.NTLM,auto_referrals=False) + else: # Basic LDAP + logging.debug('Authenticating to LDAP server with LDAP') + connection = ldap3.Connection(server, user=ldapUser, password=self.__password, + authentication=ldap3.NTLM) + if not connection.bind(): + if connection.result['result'] == ldap3.core.results.RESULT_INVALID_CREDENTIALS and valid is True\ + and protocol == 'ldaps' and self.__ldap_channel_binding is False: + logging.warning('Authentication failed with LDAPS, trying with channel binding') + self.__ldap_channel_binding = True + return self.get_ldap_connection(protocol='ldaps', valid=valid) + elif connection.result['result'] == ldap3.core.results.RESULT_STRONGER_AUTH_REQUIRED: + if protocol == 'ldaps' and self.__ldap_channel_binding is False: + logging.warning('Authentication failed with LDAPS, trying with channel binding') + self.__ldap_channel_binding = True + return self.get_ldap_connection(protocol='ldaps', valid=valid) + if protocol == 'ldap': + # setting this because we would not have received RESULT_STRONGER_AUTH_REQUIRED + # if the credentials were invalid. This is important because a lack of ldap channel binding + # in an environment that enforces it will often show up as a (misnomer) error RESULT_INVALID_CREDENTIALS + valid = True + logging.warning('Authentication failed with LDAP, trying LDAPS') + return self.get_ldap_connection(protocol='ldaps', valid=valid) + else: + raise Exception('Failed to authenticate. Error: %s' + % ldap3.core.results.RESULT_CODES[connection.result['result']]) + logging.info('Successfully authenticated') + self.__root = server.info.other['defaultNamingContext'][0] + return connection + def run(self): if self.__usersFile: self.request_users_file_TGSs() @@ -266,33 +338,9 @@ def run(self): self.__target = self.getMachineName(self.__target) # Connect to LDAP - try: - ldapConnection = ldap.LDAPConnection('ldap://%s' % self.__target, self.baseDN, self.__kdcIP) - if self.__doKerberos is not True: - ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash) - else: - ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash, - self.__nthash, - self.__aesKey, kdcHost=self.__kdcIP) - except ldap.LDAPSessionError as e: - if str(e).find('strongerAuthRequired') >= 0: - # We need to try SSL - ldapConnection = ldap.LDAPConnection('ldaps://%s' % self.__target, self.baseDN, self.__kdcIP) - if self.__doKerberos is not True: - ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash) - else: - ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash, - self.__nthash, - self.__aesKey, kdcHost=self.__kdcIP) - else: - if str(e).find('NTLMAuthNegotiate') >= 0: - logging.critical("NTLM negotiation failed. Probably NTLM is disabled. Try to use Kerberos " - "authentication instead.") - else: - if self.__kdcIP is not None and self.__kdcHost is not None: - logging.critical("If the credentials are valid, check the hostname and IP address of KDC. They " - "must match exactly each other") - raise + connection = self.get_ldap_connection() + + # Building the search filter filter_spn = "servicePrincipalName=*" @@ -313,6 +361,58 @@ def run(self): searchFilter += ')' + + if isinstance(connection, ldap.LDAPConnection): + # python-ldap method, for Kerberos + answers = self.get_answers_ldap(connection, searchFilter) + else: + # ldap3 method, for everything else + answers = self.get_answers_ldap3(connection, searchFilter) + + + if len(answers) > 0: + self.printTable(answers, header=["ServicePrincipalName", "Name", "MemberOf", "PasswordLastSet", "LastLogon", + "Delegation"]) + print('\n\n') + + if self.__requestTGS is True or self.__requestUser is not None: + # Let's get unique user names and a SPN to request a TGS for + users = dict((vals[1], vals[0]) for vals in answers) + + # Get a TGT for the current user + TGT = self.getTGT() + + if self.__outputFileName is not None: + fd = open(self.__outputFileName, 'w+') + else: + fd = None + + for user, SPN in users.items(): + sAMAccountName = user + downLevelLogonName = self.__targetDomain + "\\" + sAMAccountName + + try: + principalName = Principal() + principalName.type = constants.PrincipalNameType.NT_MS_PRINCIPAL.value + principalName.components = [downLevelLogonName] + + tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS(principalName, self.__domain, + self.__kdcIP, + TGT['KDC_REP'], TGT['cipher'], + TGT['sessionKey']) + self.outputTGS(tgs, oldSessionKey, sessionKey, sAMAccountName, + self.__targetDomain + "/" + sAMAccountName, fd) + except Exception as e: + logging.debug("Exception:", exc_info=True) + logging.error('Principal: %s - %s' % (downLevelLogonName, str(e))) + + if fd is not None: + fd.close() + + else: + print("No entries found!") + + def get_answers_ldap(self, ldapConnection, searchFilter): try: # Microsoft Active Directory set an hard limit of 1000 entries returned by any search paged_search_control = ldapasn1.SimplePagedResultsControl(criticality=True, size=1000) @@ -327,7 +427,6 @@ def run(self): # We should never reach this code as we use paged search now logging.debug('sizeLimitExceeded exception caught, giving up and processing the data received') resp = e.getAnswers() - pass else: raise @@ -379,49 +478,63 @@ def run(self): answers.append([spn, sAMAccountName, memberOf, pwdLastSet, lastLogon, delegation]) except Exception as e: logging.error('Skipping item, cannot process due to error %s' % str(e)) - pass + return answers + + def get_answers_ldap3(self, connection, searchFilter): + # Microsoft Active Directory set an hard limit of 1000 entries returned by any search + resp = connection.extend.standard.paged_search( + search_base = self.__root, + search_filter = searchFilter, + attributes=['servicePrincipalName', 'sAMAccountName', + 'pwdLastSet', 'MemberOf', 'userAccountControl', 'lastLogon'], + paged_size = 1000, + generator=False) - if len(answers) > 0: - self.printTable(answers, header=["ServicePrincipalName", "Name", "MemberOf", "PasswordLastSet", "LastLogon", - "Delegation"]) - print('\n\n') - - if self.__requestTGS is True or self.__requestUser is not None: - # Let's get unique user names and a SPN to request a TGS for - users = dict((vals[1], vals[0]) for vals in answers) - - # Get a TGT for the current user - TGT = self.getTGT() - - if self.__outputFileName is not None: - fd = open(self.__outputFileName, 'w+') + answers = [] + # filter out the resRefs and keep the Entries + entries = [ item for item in resp if item['type'] == 'searchResEntry' ] + logging.debug('Total records returned %d' % len(entries)) + + for item in entries: + mustCommit = False + sAMAccountName = '' + memberOf = '' + SPNs = [] + pwdLastSet = '' + userAccountControl = 0 + lastLogon = 'N/A' + delegation = '' + try: + if item['attributes'].get('sAMAccountName', None) is None: + continue else: - fd = None - - for user, SPN in users.items(): - sAMAccountName = user - downLevelLogonName = self.__targetDomain + "\\" + sAMAccountName - - try: - principalName = Principal() - principalName.type = constants.PrincipalNameType.NT_MS_PRINCIPAL.value - principalName.components = [downLevelLogonName] - - tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS(principalName, self.__domain, - self.__kdcIP, - TGT['KDC_REP'], TGT['cipher'], - TGT['sessionKey']) - self.outputTGS(tgs, oldSessionKey, sessionKey, sAMAccountName, - self.__targetDomain + "/" + sAMAccountName, fd) - except Exception as e: - logging.debug("Exception:", exc_info=True) - logging.error('Principal: %s - %s' % (downLevelLogonName, str(e))) + sAMAccountName = item['attributes']['sAMAccountName'] + mustCommit = True + + userAccountControl = item['attributes']['userAccountControl'] + if item['attributes']['userAccountControl'] & UF_TRUSTED_FOR_DELEGATION: + delegation = 'unconstrained' + elif int(userAccountControl) & UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION: + delegation = 'constrained' + + # if the SPN is not a member of a group, this will throw an error + # so I just replace it with a string containing '(null)' because + # this does not get used in the SPN request anyways + memberOf = next(iter(item['attributes']['memberOf']), '(null)') + pwdLastSet = str(item['attributes']['pwdLastSet']) + lastLogon = str(item['attributes']['lastLogon']) + for spn in item['attributes']['servicePrincipalName']: + SPNs.append(spn) - if fd is not None: - fd.close() - - else: - print("No entries found!") + if mustCommit is True: + if userAccountControl & UF_ACCOUNTDISABLE: + logging.debug('Bypassing disabled account %s ' % sAMAccountName) + else: + for spn in SPNs: + answers.append([spn, sAMAccountName, memberOf, pwdLastSet, lastLogon, delegation]) + except Exception as e: + logging.error('Skipping item, cannot process due to error %s' % str(e)) + return answers def request_users_file_TGSs(self): @@ -510,6 +623,7 @@ def request_multiple_TGSs(self, usernames): group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') + group.add_argument('-ldap-channel-binding', action="store_true", help='Use LDAP channel binding') group.add_argument('-k', action="store_true", help='Use Kerberos authentication. Grabs credentials from ccache file ' '(KRB5CCNAME) based on target parameters. If valid credentials '