Adding persistent MFA credentials in so you don’t have to keep putting in the virtual device 6 digit token with AWS SDK boto3

I’m working on a tool that parses through Amazon Web Services (AWS) and identifies which roles are used by which instances, since the AWS console doesn’t really do this very easily. I have an AWS account with MFA activated, so if I try to use the SDK, I have to validate the MFA before I can make any calls to the system. My last blog ‘Using MFA with AWS using Python and boto3got into the weeds of how to actually set up a way to input your MFA, but the problem at that point was that every time I ran the program, I had to input the MFA, so I started looking around for ways to store the credentials safely, so that they could be re-used if they hadn’t expired yet, because once you validate MFA it looked like to me the returned tokens were valid for 24 hours. So that said, I started chipping away at a way to persist the tokens so I only had to input the MFA if the token actually expired.

I went about this by making a class that instantiates when main.py kicks off.

Starting a new project using PyCharm, using Python 3.8.2 the folder structure to start with will look something like this:

main.py is the bulk of it, but to make life easy on us I’ve separated out the AWS STS items, and I made a class to keep all of the STS calls in tidy order.

Because we need to persist the data, there are two ways I found I could do this, storing it either in the computers memory using a tool called Redis, or creating a file and storing the data in a file. I felt like the memory storage option would be more challenging, and in a way safer because accessing data from memory directly is more difficult than finding some text file with your tokens. That just felt like a security code smell. I think I may work on encrypting the tokens as they get stored in memory, but for now this will do.

My only beef with using Redis is that you have to run a local server to access the data in memory. I was hoping there would be a way to write to memory directly without running something like a server, and there likely is, but that’s beyond my skill level at this point. I’ll get there soon enough.

Head to the Redis Download page and get the latest stable version of Redis (or if you’re feeling frisky try the pre-releases or unstable versions). If you’re on Mac you can use Homebrew, follow these instructions:

brew install redis

That gets the server installed, to start the server, I was able to do it two ways. Open up your favorite terminal, I use iTerm2, and use one of these commands:

redis-server /usr/local/etc/redis.conf

OR

brew services start redis

I kind of like the first option more because it actually gives you some info on the server that’s started, returning this:

                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 6.0.10 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

Starting this using brew services doesn’t give you this info.

Ok so we’ve got our persistent memory storage server started, let’s dive in to the code. To start we need to get Redis installed in our project, so go to the terminal and install it using

pip install redis

Now let’s get the server connection instantiated in our code:

import redis

redisDB = redis.StrictRedis(host='localhost', port=6379, db=0, charset="utf-8", decode_responses=True)

This gets the connection to server set up, using the default settings. Also, it’s important to add the charset and decode_responses as well as to use redis.StrictRedis() instead of redis.Redis() because this makes it so the get functions grabbing data from the server bring it in as proper strings instead of byte-strings in python. Otherwise you have to build a little function that decodes the utf-8 each time you want to get your keys from the Redis storage.

Now we need to start building our class that is going to kick off all the MFA nonsense. Because we’re going to be using boto3 at this point, we need to install it with pip ahead of time, or just add the code and right errors that pop on the import statement in PyCharm and import the needed packages. Installing boto3 will cover both boto3 and botocore.

import redis
import boto3
from botocore.client import BaseClient

redisDB = redis.StrictRedis(host='localhost', port=6379, db=0, charset="utf-8", decode_responses=True)

class Sts():
    def __init__(self):
        self.stsClient: BaseClient = boto3.client('sts')

Notice I also added that import line about the BaseClient, I try my best to make sure things are strictly typed in python because I have found it really helps to understand what needs to go where in the bigger picture, so adding this BaseClient import gives me the ability to define that in the class property stsClient.

So what’s going on here? First we add the class with class Sts(), then I wanted it to have an active client as a property when this instantiates, so doing that you have to use the __init__(self): and then define it as the property. This reaches out to your AWS config file and starts the STS client using your access keys we already set up in the previous blog (*more info in this blog*).

Now I knew I wanted the session tokens accessible as a dictionary property so I can use them to instantiate a boto3.session.Session() instance eventually, so I need to instantiate that property in the class, as well as find a way to populate it with the necessary MFA validated tokens to pass on to other boto3 instantiations.

I started by making a private class function called __getSessionToken(self), and I had it get whatever MFA credentials were stored in the RedisDB, then using that to check if the MFA validation had expired, and if so, to reissue new certs asking for the MFA. The first step was adding the self.sessionTokens property to the class, and calling the __getSessionToken() to populate it’s value. There’s going to be a lot going on in this code, but we’ll break it down.

import redis
import boto3
from botocore.client import BaseClient

redisDB = redis.StrictRedis(host='localhost', port=6379, db=0, charset="utf-8", decode_responses=True)

class Sts():
    def __init__(self):
        self.stsClient: BaseClient = boto3.client('sts')
        self.sessionTokens = self.__getSessionToken()

    def __getSessionToken(self) -> dict:
        Return {}

Ok so we’ve added lines 10, 12 and 13, for now it’s returning {}, an empty dictionary, until we build out the function. We’re going to need a way to get and set the MFA credentials in the redisDB, so that will be our next step.

import redis
import boto3
from botocore.client import BaseClient
from dateutil.parser import parse as DTparser

redisDB = redis.StrictRedis(host='localhost', port=6379, db=0, charset="utf-8", decode_responses=True)

class Sts():
    def __init__(self):
        self.stsClient: BaseClient = boto3.client('sts')
        self.sessionTokens = self.__getSessionToken()

    def __getSessionToken(self) -> dict:
        currentCredentials = self.__getRedisMFACredentials()

    def __setRedisMFACredentials(self, credentials: dict):
        redisDB.set('AccessKeyId', credentials['AccessKeyId'])
        redisDB.set('SecretAccessKey', credentials['SecretAccessKey'])
        redisDB.set('SessionToken', credentials['SessionToken'])
        redisDB.set('Expiration', str(credentials['Expiration']))

    def __getRedisMFACredentials(self) -> dict:
        AccessKeyId = redisDB.get('AccessKeyId')
        SecretAccessKey = redisDB.get('SecretAccessKey')
        SessionToken = redisDB.get('SessionToken')
        Expiration = DTparser(redisDB.get('Expiration'))
        Credentials: dict = {'AccessKeyId': AccessKeyId,
                             'SecretAccessKey': SecretAccessKey,
                             'SessionToken': SessionToken,
                             'Expiration': Expiration}
        return Credentials

In the code above, we create a variable called currentCredientials and set it equal to our private function __getRedisMFACredentials. We’ve created two functions, a get and set to assign the values of a dictionary to redisDB. I went with these dictionary keys because they are identical to what is returned when you actually make the call to AWS using the STS client, which we’ll get to in a little bit. Now, if you’re wondering, why did we write code to get MFA credentials we haven’t stored in our redisDB yet, you’re not crazy, that’s correct. We haven’t gotten the values to actually store in the redisDB quite yet, but we’ll build this out, then put our first set of values in using a little hacky method I had to use mostly because I couldn’t figure out another way to get the intial data set into redisDB any other way. Hopefully it makes sense as we continue to build out the app here.

One thing to note is the 'Expiration' value, and the DTParser module used. When we get the value back from STS, Expiration is a datetime value, so we need to turn it into a string to store it in the redisDB, otherwise you’ll get some errors, and then to turn it back into a datetime object, DTparser is able to do it straight from the string value we get back from redisDB. Then after it’s parsed back into a datetime object, we put it all into the Credentials dictionary and send it on it’s merry way.

Next we need a way to check if the MFA creds we got from redisDB have expired or not, so to do this, we’ll add another private functions, which I called __isMFATokenExpired(self).

import redis
import boto3
from botocore.client import BaseClient
from dateutil.parser import parse as DTparser


redisDB = redis.StrictRedis(host='localhost', port=6379, db=0, charset="utf-8", decode_responses=True)

class Sts():
    def __init__(self):
        self.stsClient: BaseClient = boto3.client('sts')

    def __getSessionToken(self) -> dict:
        currentCredentials = self.__getRedisMFACredentials()
        if (self.__isMFATokenExpired()):
            return {}
                
    def __isMFATokenExpired(self):
            return None

    def __setRedisMFACredentials(self, credentials: dict):
        redisDB.set('AccessKeyId', credentials['AccessKeyId'])
        redisDB.set('SecretAccessKey', credentials['SecretAccessKey'])
        redisDB.set('SessionToken', credentials['SessionToken'])
        redisDB.set('Expiration', str(credentials['Expiration']))

    def __getRedisMFACredentials(self) -> dict:
        AccessKeyId = redisDB.get('AccessKeyId')
        SecretAccessKey = redisDB.get('SecretAccessKey')
        SessionToken = redisDB.get('SessionToken')
        Expiration = DTparser(redisDB.get('Expiration'))
        Credentials: dict = {'AccessKeyId': AccessKeyId,
                                 'SecretAccessKey': SecretAccessKey,
                                 'SessionToken': SessionToken,
                                 'Expiration': Expiration}
        return Credentials


Now we’ve added the starter of a private method to check if the token has expired. Let’s focus just on __isMFATokenExpired():

import datetime
import pytz
...
...
    def __isMFATokenExpired(self):
        MFAExpirationDate = self.__getRedisMFACredentials()['Expiration']
        today = pytz.UTC.localize(datetime.datetime.now())
        if MFAExpirationDate < today:
            return True
        else:
            return False
...

Building out this function we have the MFAExpirationDate coming from grabbing JUST the value from the 'Expiration‘ key returned by __getRedisMFACredentials(). This returns a datetime value. We want to see if it’s expired to we’ll want to make sure that if today is greater than the expiration date, it has expired and it’ll return True, otherwise, it’ll be false. Note that we have to import datetime at the top of our stsAPI.py, and a new module called pytz. This is because date times have two different modes, offset-naive, and offset-aware. As best I can tell, this just means it either includes the timezone information, or it doesn’t. If it’s naive, it’s without the datetime information, and if it’s aware, it includes datetime. When you create an object representing the moment it was created with datetime.datetime.now(), it is offset-naive. Using pytz.UTC.localize() changes it to offset-aware. You can get more info on this in this stack overflow article. Otherwise your code will error out.

Next, if the MFA Token we get from redisDB has in fact expired, we need a way to re-up our tokens, by capturing both our 6-digit virtual authenticator device, and we’ll need the mfa_serial value from the ~/.aws/config file. In addition to that, I wanted a way to make sure that the value we input was actually 6 digits long. We’ll focus on __getSessionToken(), completing the code to make all the magic happen.

...
class Sts():
    def __init__(self):
        self.stsClient: BaseClient = boto3.client('sts')
        self.sessionTokens = self.__getSessionToken()

    def __getSessionToken(self) -> dict:
        currentCredentials = self.__getRedisMFACredentials()
        if (self.__isMFATokenExpired()):
            mfa_token = input('Please enter your 6 digit MFA code:')
            mfa_serial = boto3.DEFAULT_SESSION._session.full_config['profiles']['default']['mfa_serial']
            if len(mfa_token) != 6:
                print('Your MFA token is not correct.', file=sys.stderr)
            sessionToken: dict = self.stsClient.get_session_token(SerialNumber=mfa_serial, TokenCode=mfa_token)
            updatedCredentials: dict = sessionToken['Credentials']
            self.__setRedisMFACredentials(updatedCredentials)
            return updatedCredentials
        else:
            return currentCredentials
...

Alright so building out the final bits of __getSessionToken() we’ve got it first checking if the token is expired with the function self.__isMFATokenExpired(). If it returns True it will then ask you to input your mfa_token from whatever authenticator app you’re using. Then it goes into the bowels of boto3 and retrieves the mfa_serial from using some trickery with the boto3 instance itself. Basically when I was in the debugger, I was able to find this value for us to use programmatically with the value boto3.DEFAULT_SESSION._session.full_config['profiles']['default']['mfa_serial']. This, unfortunately, is a bit of a no-no though, so because ‘_session‘ is a protected value, which is not really a best practice to have your program using protected values (at least that’s what I think). If anyone actually has a better way to get this information, please drop a comment and let me know. I actually opened a StackOverflow question on it, but as of the time of this writing, there hasn’t really been any valuable feedback on a better way.

With these values, you can then use the self.stsClient.get_session_token() to get new credentials with the MFA validated tokens, then spit those tokens into the redisDB for later use (using self.__setRedisMFACredentials(updatedCredentials)) so we don’t have to put in the MFA validation every single time we run our program, which was proving to be quite a pain in the butt after a while. Then it will return the credentials as a dictionary, and that sets self.sessionTokens.

Our final code looks like what we have below, and this will check our MFA, see if it’s expired, and then ask for new validation if it has expired, then we can use this to create a session in our main program with the MFA validated tokens in boto3


import datetime
import pytz
import redis
import boto3
from botocore.client import BaseClient
from dateutil.parser import parse as DTparser


redisDB = redis.StrictRedis(host='localhost', port=6379, db=0, charset="utf-8", decode_responses=True)

class Sts():
    def __init__(self):
        self.stsClient: BaseClient = boto3.client('sts')
        self.sessionTokens = self.__getSessionToken()

    def __getSessionToken(self) -> dict:
        currentCredentials = self.__getRedisMFACredentials()
        if (self.__isMFATokenExpired()):
            mfa_token = input('Please enter your 6 digit MFA code:')
            mfa_serial = boto3.DEFAULT_SESSION._session.full_config['profiles']['default']['mfa_serial']
            if len(mfa_token) != 6:
                print('Your MFA token is not correct.', file=sys.stderr)
            sessionToken: dict = self.stsClient.get_session_token(SerialNumber=mfa_serial, TokenCode=mfa_token)
            updatedCredentials: dict = sessionToken['Credentials']
            self.__setRedisMFACredentials(updatedCredentials)
            return updatedCredentials
        else:
            return currentCredentials

    def __isMFATokenExpired(self):
        MFAExpirationDate = self.__getRedisMFACredentials()['Expiration']
        today = pytz.UTC.localize(datetime.datetime.now())
        if MFAExpirationDate < today:
            return True
        else:
            return False

    def __setRedisMFACredentials(self, credentials: dict):
        redisDB.set('AccessKeyId', credentials['AccessKeyId'])
        redisDB.set('SecretAccessKey', credentials['SecretAccessKey'])
        redisDB.set('SessionToken', credentials['SessionToken'])
        redisDB.set('Expiration', str(credentials['Expiration']))

    def __getRedisMFACredentials(self) -> dict:
        AccessKeyId = redisDB.get('AccessKeyId')
        SecretAccessKey = redisDB.get('SecretAccessKey')
        SessionToken = redisDB.get('SessionToken')
        Expiration = DTparser(redisDB.get('Expiration'))
        Credentials: dict = {'AccessKeyId': AccessKeyId,
                             'SecretAccessKey': SecretAccessKey,
                             'SessionToken': SessionToken,
                             'Expiration': Expiration}
        return Credentials

Using MFA with AWS using Python and boto3

Alright, so in the journey of figuring things out the hard way, we’ll just get right into it. I’ve been working on finding a way in AWS to identify which service/resource instances are using what roles, so that we can start eliminating unused roles. AWS does a decent job of providing information, but there’s still a bit of a guessing game. So to solve this, I’ve started writing a little program in python3 that can hunt through the roles and identify which roles are using the roles, if ever. In addition to this, my company has MFA activated, so connecting to AWS requires the right tokens to even start making queries about roles and whatever else.

That said, let’s just get right into how to get the AWS SDK boto3 to connect to AWS using MFA with a virtual device, like Google Authenticator, or Twilio’s Authy app. This took me a bit to figure out because I’m a noob, so I’m writing this down in hopes of it helping other noobs out too.

The Python SDK to connect to AWS is called boto3. This has everything you’d need to connect and use AWS services from the CLI level, but using Python. We’ll start from the very beginning assuming you’ve never used any of this stuff before.

Continue reading

For AWS ARN omission syntax format, what do the colons mean next to each other?

AWS introduction page

Ok so I’ve been learning AWS, in particular the Identity and Access Management (IAM) service. It’s hefty to say the least. One of the things that has been confusing is the policy JSON objects that can be used to allow or deny access or actions within AWS. In particular the ARN designation.

I want to get in to what each section is, and when it’s necessary to omit resources, and what it even means when you’re looking at something that looks like this:

  • arn:aws:s3:::MyBucket
  • arn:aws:s3:::examplebucket/my-data/sales-export-2019-q4.json
  • arn:aws:s3:::examplebucket/my-data/sales-export-????-q?.*

So first of all, there’s a general ARN layout that looks something like this:

arn:partition:service:region:account-id:resource-id
arn:partition:service:region:account-id:resource-type/resource-id
arn:partition:service:region:account-id:resource-type:resource-id

Each chunk is separated by a colon, and in the examples of the S3 Bucket, you’ll see that there may be something like ::: in your ARN. This is because for some resources/services, various sections are allowed to be omitted. in the case of an S3 bucket, region and account-id can be omitted. This is different per resource, so S3 ARN rules are different than DynamoDB, which is different than EC2, and so on. You’ll want to look up the resources documentation to get the specifics on what’s allowed and what’s not, though sometimes it can still be a bit of a guessing game.

Seeing the colons next to each other was a bit confusing at first for me, but all it means is that those particular sections of an ARN can be omitted.