16 - EMR & Docker¶
[ ]:
import getpass
import boto3
import awswrangler as wr
Enter your bucket name:¶
[2]:
bucket = getpass.getpass()
··········································
Enter your Subnet ID:¶
[3]:
subnet = getpass.getpass()
························
Build and Upload Docker Image to ECR repository¶
Replace the {ACCOUNT_ID}
placeholder.
[ ]:
%%writefile Dockerfile
FROM amazoncorretto:8
RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development
RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv
RUN python -V
RUN python3 -V
ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3
RUN pip3 install --upgrade pip
RUN pip3 install awswrangler
RUN python3 -c "import awswrangler as wr"
[ ]:
%%bash
docker build -t 'local/emr-wrangler' .
aws ecr create-repository --repository-name emr-wrangler
docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler
eval $(aws ecr get-login --region us-east-1 --no-include-email)
docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler
Creating EMR Cluster¶
[4]:
cluster_id = wr.emr.create_cluster(subnet, docker=True)
Refresh ECR credentials in the cluster (expiration time: 12h )¶
[5]:
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/")
[5]:
's-1B0O45RWJL8CL'
Uploading application script to Amazon S3 (PySpark)¶
[7]:
script = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-awswrangler").getOrCreate()
sc = spark.sparkContext
print("Spark Initialized")
import awswrangler as wr
print(f"awswrangler version: {wr.__version__}")
"""
boto3.client("s3").put_object(Body=script, Bucket=bucket, Key="test_docker.py")
Submit PySpark step¶
[8]:
DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
step_id = wr.emr.submit_spark_step(cluster_id, f"s3://{bucket}/test_docker.py", docker_image=DOCKER_IMAGE)
Wait Step¶
[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
Terminate Cluster¶
[ ]:
wr.emr.terminate_cluster(cluster_id)
Another example with custom configurations¶
[9]:
cluster_id = wr.emr.create_cluster(
cluster_name="my-demo-cluster-v2",
logging_s3_path=f"s3://{bucket}/emr-logs/",
emr_release="emr-6.7.0",
subnet_id=subnet,
emr_ec2_role="EMR_EC2_DefaultRole",
emr_role="EMR_DefaultRole",
instance_type_master="m5.2xlarge",
instance_type_core="m5.2xlarge",
instance_ebs_size_master=50,
instance_ebs_size_core=50,
instance_num_on_demand_master=0,
instance_num_on_demand_core=0,
instance_num_spot_master=1,
instance_num_spot_core=2,
spot_bid_percentage_of_on_demand_master=100,
spot_bid_percentage_of_on_demand_core=100,
spot_provisioning_timeout_master=5,
spot_provisioning_timeout_core=5,
spot_timeout_to_on_demand_master=False,
spot_timeout_to_on_demand_core=False,
python3=True,
docker=True,
spark_glue_catalog=True,
hive_glue_catalog=True,
presto_glue_catalog=True,
debugging=True,
applications=["Hadoop", "Spark", "Hive", "Zeppelin", "Livy"],
visible_to_all_users=True,
maximize_resource_allocation=True,
keep_cluster_alive_when_no_steps=True,
termination_protected=False,
spark_pyarrow=True,
)
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/emr/")
DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
step_id = wr.emr.submit_spark_step(cluster_id, f"s3://{bucket}/test_docker.py", docker_image=DOCKER_IMAGE)
[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
wr.emr.terminate_cluster(cluster_id)