How to programmatically execute Impala queries on EMR and write the results to a CSV file?
I recently had to analyze a big chunk of data and I decided to use Impala with Amazon Web Services (AWS) Elastic MapReduce (EMR). EMR has a nice web UI to create a cluster that you can afterwards connect to and run your queries on. However, stepping through the UI takes a while and I had to recreate this cluster every day to avoid paying 24/7. Since I had great experience accessing AWS APIs using boto, I wanted to script the cluster setup and my query execution. In addition I wanted to write the results of the queries to CSV files and store them on S3.
As I turned out this was a bit more complicated than I thought. That’s why I will explain the steps in the remainder of this post.
-
Define your AWS credentials, S3 bucket and key path for your CSV result file.
Note: This is just an example. You should never store your AWS credentials in a source repository. Instead use environment variables or config files!This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersimport boto.emr from boto.s3.key import Key from boto.s3.connection import S3Connection aws_key = 'your-key' aws_secret = 'your-secret' bucket_name = 'your-bucket-name' result_file_name = 'your-result-file-path' s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False) bucket = s3_connection.get_bucket(bucket_name) query = Key(bucket) query.key = '{}.sql'.format(result_file_name) query.set_contents_from_string(''' SELECT * FROM my_table; ''') query_runner = Key(bucket) query_runner.key = '{}.runner'.format(result_file_name) query_runner.set_contents_from_string(''' REMOTE_QUERY_FILE=$1 LOCAL_QUERY_FILE=./impala.query LOCAL_OUTPUT_FILE=./output.csv S3_OUTPUT_FILE=s3://{}/{}.csv hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE impala-shell --query_file $LOCAL_QUERY_FILE \ --output_file $LOCAL_OUTPUT_FILE \ --print_header \ --delimited \ --output_delimiter=',' hadoop fs -rm $S3_OUTPUT_FILE hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE) hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE '''.format(bucket.name, result_file_name)) emr_steps = [ boto.emr.step.ScriptRunnerStep( name='Run query {}'.format(result_file_name), step_args=[ 's3://{}/{}'.format(bucket.name, query_runner.key), 's3://{}/{}'.format(bucket.name, query.key) ] ) ] emr_connection = boto.emr.connection.EmrConnection( aws_key, aws_secret, region=emr_region('us-east-1'), ) emr_connection.run_jobflow( name='Cluster name', log_uri='s3://{}/logs'.format(bucket.name), ec2_keyname='my_key', master_instance_type='m3.xlarge', slave_instance_type='m3.2xlarge', num_instances=2, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, ami_version='3.3.1', bootstrap_actions=[ boto.emr.BootstrapAction( 'Install Impala', 's3://us-east-1.elasticmapreduce/libs/impala/setup-impala', [ '--install-impala', '--base-path', 's3://us-east-1.elasticmapreduce', '--impala-version', 'latest', 'IMPALA_BACKEND_PORT=22001', 'IMPALA_MEM_LIMIT=90%', ] )], steps=emr_steps, ) -
Establish a connection to this S3 bucket.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersimport boto.emr from boto.s3.key import Key from boto.s3.connection import S3Connection aws_key = 'your-key' aws_secret = 'your-secret' bucket_name = 'your-bucket-name' result_file_name = 'your-result-file-path' s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False) bucket = s3_connection.get_bucket(bucket_name) query = Key(bucket) query.key = '{}.sql'.format(result_file_name) query.set_contents_from_string(''' SELECT * FROM my_table; ''') query_runner = Key(bucket) query_runner.key = '{}.runner'.format(result_file_name) query_runner.set_contents_from_string(''' REMOTE_QUERY_FILE=$1 LOCAL_QUERY_FILE=./impala.query LOCAL_OUTPUT_FILE=./output.csv S3_OUTPUT_FILE=s3://{}/{}.csv hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE impala-shell --query_file $LOCAL_QUERY_FILE \ --output_file $LOCAL_OUTPUT_FILE \ --print_header \ --delimited \ --output_delimiter=',' hadoop fs -rm $S3_OUTPUT_FILE hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE) hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE '''.format(bucket.name, result_file_name)) emr_steps = [ boto.emr.step.ScriptRunnerStep( name='Run query {}'.format(result_file_name), step_args=[ 's3://{}/{}'.format(bucket.name, query_runner.key), 's3://{}/{}'.format(bucket.name, query.key) ] ) ] emr_connection = boto.emr.connection.EmrConnection( aws_key, aws_secret, region=emr_region('us-east-1'), ) emr_connection.run_jobflow( name='Cluster name', log_uri='s3://{}/logs'.format(bucket.name), ec2_keyname='my_key', master_instance_type='m3.xlarge', slave_instance_type='m3.2xlarge', num_instances=2, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, ami_version='3.3.1', bootstrap_actions=[ boto.emr.BootstrapAction( 'Install Impala', 's3://us-east-1.elasticmapreduce/libs/impala/setup-impala', [ '--install-impala', '--base-path', 's3://us-east-1.elasticmapreduce', '--impala-version', 'latest', 'IMPALA_BACKEND_PORT=22001', 'IMPALA_MEM_LIMIT=90%', ] )], steps=emr_steps, ) -
Create a query file according to Impala SQL and upload it to S3.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersimport boto.emr from boto.s3.key import Key from boto.s3.connection import S3Connection aws_key = 'your-key' aws_secret = 'your-secret' bucket_name = 'your-bucket-name' result_file_name = 'your-result-file-path' s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False) bucket = s3_connection.get_bucket(bucket_name) query = Key(bucket) query.key = '{}.sql'.format(result_file_name) query.set_contents_from_string(''' SELECT * FROM my_table; ''') query_runner = Key(bucket) query_runner.key = '{}.runner'.format(result_file_name) query_runner.set_contents_from_string(''' REMOTE_QUERY_FILE=$1 LOCAL_QUERY_FILE=./impala.query LOCAL_OUTPUT_FILE=./output.csv S3_OUTPUT_FILE=s3://{}/{}.csv hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE impala-shell --query_file $LOCAL_QUERY_FILE \ --output_file $LOCAL_OUTPUT_FILE \ --print_header \ --delimited \ --output_delimiter=',' hadoop fs -rm $S3_OUTPUT_FILE hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE) hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE '''.format(bucket.name, result_file_name)) emr_steps = [ boto.emr.step.ScriptRunnerStep( name='Run query {}'.format(result_file_name), step_args=[ 's3://{}/{}'.format(bucket.name, query_runner.key), 's3://{}/{}'.format(bucket.name, query.key) ] ) ] emr_connection = boto.emr.connection.EmrConnection( aws_key, aws_secret, region=emr_region('us-east-1'), ) emr_connection.run_jobflow( name='Cluster name', log_uri='s3://{}/logs'.format(bucket.name), ec2_keyname='my_key', master_instance_type='m3.xlarge', slave_instance_type='m3.2xlarge', num_instances=2, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, ami_version='3.3.1', bootstrap_actions=[ boto.emr.BootstrapAction( 'Install Impala', 's3://us-east-1.elasticmapreduce/libs/impala/setup-impala', [ '--install-impala', '--base-path', 's3://us-east-1.elasticmapreduce', '--impala-version', 'latest', 'IMPALA_BACKEND_PORT=22001', 'IMPALA_MEM_LIMIT=90%', ] )], steps=emr_steps, ) -
Create a query runner script that executes the query on EMR and upload it to S3. The query file from the last step is passed as parameter (line 22) and downloaded from S3 to the local machine (line 26). Afterwards the query is executed (line 27) and the result is written to a CSV formatted file (line 28-31). Finally, the result is uploaded to S3 (line 32-34). To avoid an error if the file already exists, the file is deleted before hand (line 32).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersimport boto.emr from boto.s3.key import Key from boto.s3.connection import S3Connection aws_key = 'your-key' aws_secret = 'your-secret' bucket_name = 'your-bucket-name' result_file_name = 'your-result-file-path' s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False) bucket = s3_connection.get_bucket(bucket_name) query = Key(bucket) query.key = '{}.sql'.format(result_file_name) query.set_contents_from_string(''' SELECT * FROM my_table; ''') query_runner = Key(bucket) query_runner.key = '{}.runner'.format(result_file_name) query_runner.set_contents_from_string(''' REMOTE_QUERY_FILE=$1 LOCAL_QUERY_FILE=./impala.query LOCAL_OUTPUT_FILE=./output.csv S3_OUTPUT_FILE=s3://{}/{}.csv hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE impala-shell --query_file $LOCAL_QUERY_FILE \ --output_file $LOCAL_OUTPUT_FILE \ --print_header \ --delimited \ --output_delimiter=',' hadoop fs -rm $S3_OUTPUT_FILE hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE) hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE '''.format(bucket.name, result_file_name)) emr_steps = [ boto.emr.step.ScriptRunnerStep( name='Run query {}'.format(result_file_name), step_args=[ 's3://{}/{}'.format(bucket.name, query_runner.key), 's3://{}/{}'.format(bucket.name, query.key) ] ) ] emr_connection = boto.emr.connection.EmrConnection( aws_key, aws_secret, region=emr_region('us-east-1'), ) emr_connection.run_jobflow( name='Cluster name', log_uri='s3://{}/logs'.format(bucket.name), ec2_keyname='my_key', master_instance_type='m3.xlarge', slave_instance_type='m3.2xlarge', num_instances=2, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, ami_version='3.3.1', bootstrap_actions=[ boto.emr.BootstrapAction( 'Install Impala', 's3://us-east-1.elasticmapreduce/libs/impala/setup-impala', [ '--install-impala', '--base-path', 's3://us-east-1.elasticmapreduce', '--impala-version', 'latest', 'IMPALA_BACKEND_PORT=22001', 'IMPALA_MEM_LIMIT=90%', ] )], steps=emr_steps, ) -
Generate an EMR step that executes the query runner script with with the query file as parameter.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersimport boto.emr from boto.s3.key import Key from boto.s3.connection import S3Connection aws_key = 'your-key' aws_secret = 'your-secret' bucket_name = 'your-bucket-name' result_file_name = 'your-result-file-path' s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False) bucket = s3_connection.get_bucket(bucket_name) query = Key(bucket) query.key = '{}.sql'.format(result_file_name) query.set_contents_from_string(''' SELECT * FROM my_table; ''') query_runner = Key(bucket) query_runner.key = '{}.runner'.format(result_file_name) query_runner.set_contents_from_string(''' REMOTE_QUERY_FILE=$1 LOCAL_QUERY_FILE=./impala.query LOCAL_OUTPUT_FILE=./output.csv S3_OUTPUT_FILE=s3://{}/{}.csv hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE impala-shell --query_file $LOCAL_QUERY_FILE \ --output_file $LOCAL_OUTPUT_FILE \ --print_header \ --delimited \ --output_delimiter=',' hadoop fs -rm $S3_OUTPUT_FILE hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE) hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE '''.format(bucket.name, result_file_name)) emr_steps = [ boto.emr.step.ScriptRunnerStep( name='Run query {}'.format(result_file_name), step_args=[ 's3://{}/{}'.format(bucket.name, query_runner.key), 's3://{}/{}'.format(bucket.name, query.key) ] ) ] emr_connection = boto.emr.connection.EmrConnection( aws_key, aws_secret, region=emr_region('us-east-1'), ) emr_connection.run_jobflow( name='Cluster name', log_uri='s3://{}/logs'.format(bucket.name), ec2_keyname='my_key', master_instance_type='m3.xlarge', slave_instance_type='m3.2xlarge', num_instances=2, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, ami_version='3.3.1', bootstrap_actions=[ boto.emr.BootstrapAction( 'Install Impala', 's3://us-east-1.elasticmapreduce/libs/impala/setup-impala', [ '--install-impala', '--base-path', 's3://us-east-1.elasticmapreduce', '--impala-version', 'latest', 'IMPALA_BACKEND_PORT=22001', 'IMPALA_MEM_LIMIT=90%', ] )], steps=emr_steps, ) -
Connect to EMR.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersimport boto.emr from boto.s3.key import Key from boto.s3.connection import S3Connection aws_key = 'your-key' aws_secret = 'your-secret' bucket_name = 'your-bucket-name' result_file_name = 'your-result-file-path' s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False) bucket = s3_connection.get_bucket(bucket_name) query = Key(bucket) query.key = '{}.sql'.format(result_file_name) query.set_contents_from_string(''' SELECT * FROM my_table; ''') query_runner = Key(bucket) query_runner.key = '{}.runner'.format(result_file_name) query_runner.set_contents_from_string(''' REMOTE_QUERY_FILE=$1 LOCAL_QUERY_FILE=./impala.query LOCAL_OUTPUT_FILE=./output.csv S3_OUTPUT_FILE=s3://{}/{}.csv hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE impala-shell --query_file $LOCAL_QUERY_FILE \ --output_file $LOCAL_OUTPUT_FILE \ --print_header \ --delimited \ --output_delimiter=',' hadoop fs -rm $S3_OUTPUT_FILE hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE) hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE '''.format(bucket.name, result_file_name)) emr_steps = [ boto.emr.step.ScriptRunnerStep( name='Run query {}'.format(result_file_name), step_args=[ 's3://{}/{}'.format(bucket.name, query_runner.key), 's3://{}/{}'.format(bucket.name, query.key) ] ) ] emr_connection = boto.emr.connection.EmrConnection( aws_key, aws_secret, region=emr_region('us-east-1'), ) emr_connection.run_jobflow( name='Cluster name', log_uri='s3://{}/logs'.format(bucket.name), ec2_keyname='my_key', master_instance_type='m3.xlarge', slave_instance_type='m3.2xlarge', num_instances=2, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, ami_version='3.3.1', bootstrap_actions=[ boto.emr.BootstrapAction( 'Install Impala', 's3://us-east-1.elasticmapreduce/libs/impala/setup-impala', [ '--install-impala', '--base-path', 's3://us-east-1.elasticmapreduce', '--impala-version', 'latest', 'IMPALA_BACKEND_PORT=22001', 'IMPALA_MEM_LIMIT=90%', ] )], steps=emr_steps, ) -
Start a new EMR cluster and execute the generated EMR step.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersimport boto.emr from boto.s3.key import Key from boto.s3.connection import S3Connection aws_key = 'your-key' aws_secret = 'your-secret' bucket_name = 'your-bucket-name' result_file_name = 'your-result-file-path' s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False) bucket = s3_connection.get_bucket(bucket_name) query = Key(bucket) query.key = '{}.sql'.format(result_file_name) query.set_contents_from_string(''' SELECT * FROM my_table; ''') query_runner = Key(bucket) query_runner.key = '{}.runner'.format(result_file_name) query_runner.set_contents_from_string(''' REMOTE_QUERY_FILE=$1 LOCAL_QUERY_FILE=./impala.query LOCAL_OUTPUT_FILE=./output.csv S3_OUTPUT_FILE=s3://{}/{}.csv hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE impala-shell --query_file $LOCAL_QUERY_FILE \ --output_file $LOCAL_OUTPUT_FILE \ --print_header \ --delimited \ --output_delimiter=',' hadoop fs -rm $S3_OUTPUT_FILE hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE) hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE '''.format(bucket.name, result_file_name)) emr_steps = [ boto.emr.step.ScriptRunnerStep( name='Run query {}'.format(result_file_name), step_args=[ 's3://{}/{}'.format(bucket.name, query_runner.key), 's3://{}/{}'.format(bucket.name, query.key) ] ) ] emr_connection = boto.emr.connection.EmrConnection( aws_key, aws_secret, region=emr_region('us-east-1'), ) emr_connection.run_jobflow( name='Cluster name', log_uri='s3://{}/logs'.format(bucket.name), ec2_keyname='my_key', master_instance_type='m3.xlarge', slave_instance_type='m3.2xlarge', num_instances=2, action_on_failure='CANCEL_AND_WAIT', keep_alive=True, ami_version='3.3.1', bootstrap_actions=[ boto.emr.BootstrapAction( 'Install Impala', 's3://us-east-1.elasticmapreduce/libs/impala/setup-impala', [ '--install-impala', '--base-path', 's3://us-east-1.elasticmapreduce', '--impala-version', 'latest', 'IMPALA_BACKEND_PORT=22001', 'IMPALA_MEM_LIMIT=90%', ] )], steps=emr_steps, )
You can find the entire script here.
Suggestion, comments, critique? Please drop me a note. Feedback is always appreciated!