How to Create Dataframe from AWS Athena using Boto3 get_query_results method
get_query_results only returns 1000 rows. How can I use it to get two million rows into a Pandas dataframe?
If you try to add:
client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)
You will obtain the next error:
An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.
You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):
def obtain_data_from_s3(self): self.resource = boto3.resource('s3', region_name = self.region_name, aws_access_key_id = self.aws_access_key_id, aws_secret_access_key= self.aws_secret_access_key) response = self.resource \ .Bucket(self.bucket) \ .Object(key= self.folder + self.filename + '.csv') \ .get() return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')
The self.filename can be:
self.filename = response['QueryExecutionId'] + ".csv"
Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.
import timeimport boto3import pandas as pdimport ioclass QueryAthena: def __init__(self, query, database): self.database = database self.folder = 'my_folder/' self.bucket = 'my_bucket' self.s3_input = 's3://' + self.bucket + '/my_folder_input' self.s3_output = 's3://' + self.bucket + '/' + self.folder self.region_name = 'us-east-1' self.aws_access_key_id = "my_aws_access_key_id" self.aws_secret_access_key = "my_aws_secret_access_key" self.query = query def load_conf(self, q): try: self.client = boto3.client('athena', region_name = self.region_name, aws_access_key_id = self.aws_access_key_id, aws_secret_access_key= self.aws_secret_access_key) response = self.client.start_query_execution( QueryString = q, QueryExecutionContext={ 'Database': self.database }, ResultConfiguration={ 'OutputLocation': self.s3_output, } ) self.filename = response['QueryExecutionId'] print('Execution ID: ' + response['QueryExecutionId']) except Exception as e: print(e) return response def run_query(self): queries = [self.query] for q in queries: res = self.load_conf(q) try: query_status = None while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None: query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State'] print(query_status) if query_status == 'FAILED' or query_status == 'CANCELLED': raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query)) time.sleep(10) print('Query "{}" finished.'.format(self.query)) df = self.obtain_data() return df except Exception as e: print(e) def obtain_data(self): try: self.resource = boto3.resource('s3', region_name = self.region_name, aws_access_key_id = self.aws_access_key_id, aws_secret_access_key= self.aws_secret_access_key) response = self.resource \ .Bucket(self.bucket) \ .Object(key= self.folder + self.filename + '.csv') \ .get() return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8') except Exception as e: print(e) if __name__ == "__main__": query = "SELECT * FROM bucket.folder" qa = QueryAthena(query=query, database='myAthenaDb') dataframe = qa.run_query()
I have a solution for my first question, using the following function
def results_to_df(results): columns = [ col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo'] ] listed_results = [] for res in results['ResultSet']['Rows'][1:]: values = [] for field in res['Data']: try: values.append(list(field.values())[0]) except: values.append(list(' ')) listed_results.append( dict(zip(columns, values)) ) return listed_results
and then:
t = results_to_df(response)pd.DataFrame(t)
As for my 2nd question and to the request of @EricBellet I'm also adding my approach for pagination which I find as inefficient and longer in compare to loading the results from Athena output in S3:
def run_query(query, database, s3_output): ''' Function for executing Athena queries and return the query ID ''' client = boto3.client('athena') response = client.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': database }, ResultConfiguration={ 'OutputLocation': s3_output, } ) print('Execution ID: ' + response['QueryExecutionId']) return responsedef format_result(results): ''' This function format the results toward append in the needed format. ''' columns = [ col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo'] ] formatted_results = [] for result in results['ResultSet']['Rows'][0:]: values = [] for field in result['Data']: try: values.append(list(field.values())[0]) except: values.append(list(' ')) formatted_results.append( dict(zip(columns, values)) ) return formatted_resultsres = run_query(query_2, database, s3_ouput) #query Athenaimport sysimport boto3marker = Noneformatted_results = []query_id = res['QueryExecutionId']i = 0start_time = time.time()while True: paginator = client.get_paginator('get_query_results') response_iterator = paginator.paginate( QueryExecutionId=query_id, PaginationConfig={ 'MaxItems': 1000, 'PageSize': 1000, 'StartingToken': marker}) for page in response_iterator: i = i + 1 format_page = format_result(page) if i == 1: formatted_results = pd.DataFrame(format_page) elif i > 1: formatted_results = formatted_results.append(pd.DataFrame(format_page)) try: marker = page['NextToken'] except KeyError: breakprint ("My program took", time.time() - start_time, "to run")
It's not formatted so good but I think it does the job...
You can use AWS Data Wrangler to create pandas data frame directly querying through Athena.
import awswrangler as wr df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")
You can find more information here