28 January 2015

I recently had to analyze a big chunk of data and I decided to use Impala with Amazon Web Services (AWS) Elastic MapReduce (EMR). EMR has a nice web UI to create a cluster that you can afterwards connect to and run your queries on. However, stepping through the UI takes a while and I had to recreate this cluster every day to avoid paying 24/7. Since I had great experience accessing AWS APIs using boto, I wanted to script the cluster setup and my query execution. In addition I wanted to write the results of the queries to CSV files and store them on S3.

As I turned out this was a bit more complicated than I thought. That’s why I will explain the steps in the remainder of this post.

  1. Define your AWS credentials, S3 bucket and key path for your CSV result file.
    Note: This is just an example. You should never store your AWS credentials in a source repository. Instead use environment variables or config files!

    import boto.emr
    from boto.s3.key import Key
    from boto.s3.connection import S3Connection
    aws_key = 'your-key'
    aws_secret = 'your-secret'
    bucket_name = 'your-bucket-name'
    result_file_name = 'your-result-file-path'
    s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False)
    bucket = s3_connection.get_bucket(bucket_name)
    query = Key(bucket)
    query.key = '{}.sql'.format(result_file_name)
    query.set_contents_from_string('''
    SELECT * FROM my_table;
    ''')
    query_runner = Key(bucket)
    query_runner.key = '{}.runner'.format(result_file_name)
    query_runner.set_contents_from_string('''
    REMOTE_QUERY_FILE=$1
    LOCAL_QUERY_FILE=./impala.query
    LOCAL_OUTPUT_FILE=./output.csv
    S3_OUTPUT_FILE=s3://{}/{}.csv
    hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE
    impala-shell --query_file $LOCAL_QUERY_FILE \
    --output_file $LOCAL_OUTPUT_FILE \
    --print_header \
    --delimited \
    --output_delimiter=','
    hadoop fs -rm $S3_OUTPUT_FILE
    hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE)
    hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE
    '''.format(bucket.name, result_file_name))
    emr_steps = [
    boto.emr.step.ScriptRunnerStep(
    name='Run query {}'.format(result_file_name),
    step_args=[
    's3://{}/{}'.format(bucket.name, query_runner.key),
    's3://{}/{}'.format(bucket.name, query.key)
    ]
    )
    ]
    emr_connection = boto.emr.connection.EmrConnection(
    aws_key,
    aws_secret,
    region=emr_region('us-east-1'),
    )
    emr_connection.run_jobflow(
    name='Cluster name',
    log_uri='s3://{}/logs'.format(bucket.name),
    ec2_keyname='my_key',
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.2xlarge',
    num_instances=2,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    ami_version='3.3.1',
    bootstrap_actions=[
    boto.emr.BootstrapAction(
    'Install Impala',
    's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
    [
    '--install-impala',
    '--base-path', 's3://us-east-1.elasticmapreduce',
    '--impala-version', 'latest',
    'IMPALA_BACKEND_PORT=22001',
    'IMPALA_MEM_LIMIT=90%',
    ]
    )],
    steps=emr_steps,
    )

  2. Establish a connection to this S3 bucket.

    import boto.emr
    from boto.s3.key import Key
    from boto.s3.connection import S3Connection
    aws_key = 'your-key'
    aws_secret = 'your-secret'
    bucket_name = 'your-bucket-name'
    result_file_name = 'your-result-file-path'
    s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False)
    bucket = s3_connection.get_bucket(bucket_name)
    query = Key(bucket)
    query.key = '{}.sql'.format(result_file_name)
    query.set_contents_from_string('''
    SELECT * FROM my_table;
    ''')
    query_runner = Key(bucket)
    query_runner.key = '{}.runner'.format(result_file_name)
    query_runner.set_contents_from_string('''
    REMOTE_QUERY_FILE=$1
    LOCAL_QUERY_FILE=./impala.query
    LOCAL_OUTPUT_FILE=./output.csv
    S3_OUTPUT_FILE=s3://{}/{}.csv
    hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE
    impala-shell --query_file $LOCAL_QUERY_FILE \
    --output_file $LOCAL_OUTPUT_FILE \
    --print_header \
    --delimited \
    --output_delimiter=','
    hadoop fs -rm $S3_OUTPUT_FILE
    hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE)
    hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE
    '''.format(bucket.name, result_file_name))
    emr_steps = [
    boto.emr.step.ScriptRunnerStep(
    name='Run query {}'.format(result_file_name),
    step_args=[
    's3://{}/{}'.format(bucket.name, query_runner.key),
    's3://{}/{}'.format(bucket.name, query.key)
    ]
    )
    ]
    emr_connection = boto.emr.connection.EmrConnection(
    aws_key,
    aws_secret,
    region=emr_region('us-east-1'),
    )
    emr_connection.run_jobflow(
    name='Cluster name',
    log_uri='s3://{}/logs'.format(bucket.name),
    ec2_keyname='my_key',
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.2xlarge',
    num_instances=2,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    ami_version='3.3.1',
    bootstrap_actions=[
    boto.emr.BootstrapAction(
    'Install Impala',
    's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
    [
    '--install-impala',
    '--base-path', 's3://us-east-1.elasticmapreduce',
    '--impala-version', 'latest',
    'IMPALA_BACKEND_PORT=22001',
    'IMPALA_MEM_LIMIT=90%',
    ]
    )],
    steps=emr_steps,
    )

  3. Create a query file according to Impala SQL and upload it to S3.

    import boto.emr
    from boto.s3.key import Key
    from boto.s3.connection import S3Connection
    aws_key = 'your-key'
    aws_secret = 'your-secret'
    bucket_name = 'your-bucket-name'
    result_file_name = 'your-result-file-path'
    s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False)
    bucket = s3_connection.get_bucket(bucket_name)
    query = Key(bucket)
    query.key = '{}.sql'.format(result_file_name)
    query.set_contents_from_string('''
    SELECT * FROM my_table;
    ''')
    query_runner = Key(bucket)
    query_runner.key = '{}.runner'.format(result_file_name)
    query_runner.set_contents_from_string('''
    REMOTE_QUERY_FILE=$1
    LOCAL_QUERY_FILE=./impala.query
    LOCAL_OUTPUT_FILE=./output.csv
    S3_OUTPUT_FILE=s3://{}/{}.csv
    hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE
    impala-shell --query_file $LOCAL_QUERY_FILE \
    --output_file $LOCAL_OUTPUT_FILE \
    --print_header \
    --delimited \
    --output_delimiter=','
    hadoop fs -rm $S3_OUTPUT_FILE
    hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE)
    hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE
    '''.format(bucket.name, result_file_name))
    emr_steps = [
    boto.emr.step.ScriptRunnerStep(
    name='Run query {}'.format(result_file_name),
    step_args=[
    's3://{}/{}'.format(bucket.name, query_runner.key),
    's3://{}/{}'.format(bucket.name, query.key)
    ]
    )
    ]
    emr_connection = boto.emr.connection.EmrConnection(
    aws_key,
    aws_secret,
    region=emr_region('us-east-1'),
    )
    emr_connection.run_jobflow(
    name='Cluster name',
    log_uri='s3://{}/logs'.format(bucket.name),
    ec2_keyname='my_key',
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.2xlarge',
    num_instances=2,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    ami_version='3.3.1',
    bootstrap_actions=[
    boto.emr.BootstrapAction(
    'Install Impala',
    's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
    [
    '--install-impala',
    '--base-path', 's3://us-east-1.elasticmapreduce',
    '--impala-version', 'latest',
    'IMPALA_BACKEND_PORT=22001',
    'IMPALA_MEM_LIMIT=90%',
    ]
    )],
    steps=emr_steps,
    )

  4. Create a query runner script that executes the query on EMR and upload it to S3. The query file from the last step is passed as parameter (line 22) and downloaded from S3 to the local machine (line 26). Afterwards the query is executed (line 27) and the result is written to a CSV formatted file (line 28-31). Finally, the result is uploaded to S3 (line 32-34). To avoid an error if the file already exists, the file is deleted before hand (line 32).

    import boto.emr
    from boto.s3.key import Key
    from boto.s3.connection import S3Connection
    aws_key = 'your-key'
    aws_secret = 'your-secret'
    bucket_name = 'your-bucket-name'
    result_file_name = 'your-result-file-path'
    s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False)
    bucket = s3_connection.get_bucket(bucket_name)
    query = Key(bucket)
    query.key = '{}.sql'.format(result_file_name)
    query.set_contents_from_string('''
    SELECT * FROM my_table;
    ''')
    query_runner = Key(bucket)
    query_runner.key = '{}.runner'.format(result_file_name)
    query_runner.set_contents_from_string('''
    REMOTE_QUERY_FILE=$1
    LOCAL_QUERY_FILE=./impala.query
    LOCAL_OUTPUT_FILE=./output.csv
    S3_OUTPUT_FILE=s3://{}/{}.csv
    hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE
    impala-shell --query_file $LOCAL_QUERY_FILE \
    --output_file $LOCAL_OUTPUT_FILE \
    --print_header \
    --delimited \
    --output_delimiter=','
    hadoop fs -rm $S3_OUTPUT_FILE
    hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE)
    hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE
    '''.format(bucket.name, result_file_name))
    emr_steps = [
    boto.emr.step.ScriptRunnerStep(
    name='Run query {}'.format(result_file_name),
    step_args=[
    's3://{}/{}'.format(bucket.name, query_runner.key),
    's3://{}/{}'.format(bucket.name, query.key)
    ]
    )
    ]
    emr_connection = boto.emr.connection.EmrConnection(
    aws_key,
    aws_secret,
    region=emr_region('us-east-1'),
    )
    emr_connection.run_jobflow(
    name='Cluster name',
    log_uri='s3://{}/logs'.format(bucket.name),
    ec2_keyname='my_key',
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.2xlarge',
    num_instances=2,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    ami_version='3.3.1',
    bootstrap_actions=[
    boto.emr.BootstrapAction(
    'Install Impala',
    's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
    [
    '--install-impala',
    '--base-path', 's3://us-east-1.elasticmapreduce',
    '--impala-version', 'latest',
    'IMPALA_BACKEND_PORT=22001',
    'IMPALA_MEM_LIMIT=90%',
    ]
    )],
    steps=emr_steps,
    )

  5. Generate an EMR step that executes the query runner script with with the query file as parameter.

    import boto.emr
    from boto.s3.key import Key
    from boto.s3.connection import S3Connection
    aws_key = 'your-key'
    aws_secret = 'your-secret'
    bucket_name = 'your-bucket-name'
    result_file_name = 'your-result-file-path'
    s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False)
    bucket = s3_connection.get_bucket(bucket_name)
    query = Key(bucket)
    query.key = '{}.sql'.format(result_file_name)
    query.set_contents_from_string('''
    SELECT * FROM my_table;
    ''')
    query_runner = Key(bucket)
    query_runner.key = '{}.runner'.format(result_file_name)
    query_runner.set_contents_from_string('''
    REMOTE_QUERY_FILE=$1
    LOCAL_QUERY_FILE=./impala.query
    LOCAL_OUTPUT_FILE=./output.csv
    S3_OUTPUT_FILE=s3://{}/{}.csv
    hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE
    impala-shell --query_file $LOCAL_QUERY_FILE \
    --output_file $LOCAL_OUTPUT_FILE \
    --print_header \
    --delimited \
    --output_delimiter=','
    hadoop fs -rm $S3_OUTPUT_FILE
    hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE)
    hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE
    '''.format(bucket.name, result_file_name))
    emr_steps = [
    boto.emr.step.ScriptRunnerStep(
    name='Run query {}'.format(result_file_name),
    step_args=[
    's3://{}/{}'.format(bucket.name, query_runner.key),
    's3://{}/{}'.format(bucket.name, query.key)
    ]
    )
    ]
    emr_connection = boto.emr.connection.EmrConnection(
    aws_key,
    aws_secret,
    region=emr_region('us-east-1'),
    )
    emr_connection.run_jobflow(
    name='Cluster name',
    log_uri='s3://{}/logs'.format(bucket.name),
    ec2_keyname='my_key',
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.2xlarge',
    num_instances=2,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    ami_version='3.3.1',
    bootstrap_actions=[
    boto.emr.BootstrapAction(
    'Install Impala',
    's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
    [
    '--install-impala',
    '--base-path', 's3://us-east-1.elasticmapreduce',
    '--impala-version', 'latest',
    'IMPALA_BACKEND_PORT=22001',
    'IMPALA_MEM_LIMIT=90%',
    ]
    )],
    steps=emr_steps,
    )

  6. Connect to EMR.

    import boto.emr
    from boto.s3.key import Key
    from boto.s3.connection import S3Connection
    aws_key = 'your-key'
    aws_secret = 'your-secret'
    bucket_name = 'your-bucket-name'
    result_file_name = 'your-result-file-path'
    s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False)
    bucket = s3_connection.get_bucket(bucket_name)
    query = Key(bucket)
    query.key = '{}.sql'.format(result_file_name)
    query.set_contents_from_string('''
    SELECT * FROM my_table;
    ''')
    query_runner = Key(bucket)
    query_runner.key = '{}.runner'.format(result_file_name)
    query_runner.set_contents_from_string('''
    REMOTE_QUERY_FILE=$1
    LOCAL_QUERY_FILE=./impala.query
    LOCAL_OUTPUT_FILE=./output.csv
    S3_OUTPUT_FILE=s3://{}/{}.csv
    hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE
    impala-shell --query_file $LOCAL_QUERY_FILE \
    --output_file $LOCAL_OUTPUT_FILE \
    --print_header \
    --delimited \
    --output_delimiter=','
    hadoop fs -rm $S3_OUTPUT_FILE
    hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE)
    hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE
    '''.format(bucket.name, result_file_name))
    emr_steps = [
    boto.emr.step.ScriptRunnerStep(
    name='Run query {}'.format(result_file_name),
    step_args=[
    's3://{}/{}'.format(bucket.name, query_runner.key),
    's3://{}/{}'.format(bucket.name, query.key)
    ]
    )
    ]
    emr_connection = boto.emr.connection.EmrConnection(
    aws_key,
    aws_secret,
    region=emr_region('us-east-1'),
    )
    emr_connection.run_jobflow(
    name='Cluster name',
    log_uri='s3://{}/logs'.format(bucket.name),
    ec2_keyname='my_key',
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.2xlarge',
    num_instances=2,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    ami_version='3.3.1',
    bootstrap_actions=[
    boto.emr.BootstrapAction(
    'Install Impala',
    's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
    [
    '--install-impala',
    '--base-path', 's3://us-east-1.elasticmapreduce',
    '--impala-version', 'latest',
    'IMPALA_BACKEND_PORT=22001',
    'IMPALA_MEM_LIMIT=90%',
    ]
    )],
    steps=emr_steps,
    )

  7. Start a new EMR cluster and execute the generated EMR step.

    import boto.emr
    from boto.s3.key import Key
    from boto.s3.connection import S3Connection
    aws_key = 'your-key'
    aws_secret = 'your-secret'
    bucket_name = 'your-bucket-name'
    result_file_name = 'your-result-file-path'
    s3_connection = S3Connection(aws_key, aws_secret, validate_certs=False)
    bucket = s3_connection.get_bucket(bucket_name)
    query = Key(bucket)
    query.key = '{}.sql'.format(result_file_name)
    query.set_contents_from_string('''
    SELECT * FROM my_table;
    ''')
    query_runner = Key(bucket)
    query_runner.key = '{}.runner'.format(result_file_name)
    query_runner.set_contents_from_string('''
    REMOTE_QUERY_FILE=$1
    LOCAL_QUERY_FILE=./impala.query
    LOCAL_OUTPUT_FILE=./output.csv
    S3_OUTPUT_FILE=s3://{}/{}.csv
    hadoop fs -copyToLocal $REMOTE_QUERY_FILE $LOCAL_QUERY_FILE
    impala-shell --query_file $LOCAL_QUERY_FILE \
    --output_file $LOCAL_OUTPUT_FILE \
    --print_header \
    --delimited \
    --output_delimiter=','
    hadoop fs -rm $S3_OUTPUT_FILE
    hadoop fs -mkdir -p $(dirname $S3_OUTPUT_FILE)
    hadoop fs -copyFromLocal $LOCAL_OUTPUT_FILE $S3_OUTPUT_FILE
    '''.format(bucket.name, result_file_name))
    emr_steps = [
    boto.emr.step.ScriptRunnerStep(
    name='Run query {}'.format(result_file_name),
    step_args=[
    's3://{}/{}'.format(bucket.name, query_runner.key),
    's3://{}/{}'.format(bucket.name, query.key)
    ]
    )
    ]
    emr_connection = boto.emr.connection.EmrConnection(
    aws_key,
    aws_secret,
    region=emr_region('us-east-1'),
    )
    emr_connection.run_jobflow(
    name='Cluster name',
    log_uri='s3://{}/logs'.format(bucket.name),
    ec2_keyname='my_key',
    master_instance_type='m3.xlarge',
    slave_instance_type='m3.2xlarge',
    num_instances=2,
    action_on_failure='CANCEL_AND_WAIT',
    keep_alive=True,
    ami_version='3.3.1',
    bootstrap_actions=[
    boto.emr.BootstrapAction(
    'Install Impala',
    's3://us-east-1.elasticmapreduce/libs/impala/setup-impala',
    [
    '--install-impala',
    '--base-path', 's3://us-east-1.elasticmapreduce',
    '--impala-version', 'latest',
    'IMPALA_BACKEND_PORT=22001',
    'IMPALA_MEM_LIMIT=90%',
    ]
    )],
    steps=emr_steps,
    )

You can find the entire script here.

Suggestion, comments, critique? Please drop me a note. Feedback is always appreciated!