Using Apache Airflow’s Docker Operator with Amazon’s Container Repository

Here is a python version of getting the ECR tokens for an AWS repository.

There is nothing to install and everyting runs smootly in from the airflow docker containers.

To make the script reusable, you need to create a variable called “aws_region_name” and set it to the correct region, for example “eu-central-1”

"""
You need to create a variable called "aws_region_name" and set it to the correct region, for example "eu-central-1"
"""
from datetime import datetime, timedelta

import json
from datetime import datetime

from airflow.decorators import dag, task

from airflow import settings
from airflow.models import Connection

#Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

@dag(default_args=default_args, schedule_interval='* */10 * * *', max_active_runs=1, start_date=datetime(2021, 1, 1), catchup=False, tags=['airflow'])
def refresh_docker_token_DAG():

    @task(multiple_outputs=True)
    def extract():
        import boto3
        from airflow.models import Variable
        aws_region_name = Variable.get("aws_region_name")
        ecr = boto3.client('ecr', region_name=aws_region_name)
        response = ecr.get_authorization_token()
        token = response['authorizationData'][0]['authorizationToken']
        registry_url = response['authorizationData'][0]['proxyEndpoint']

        return {"token": token, "registry_url": registry_url}



    @task()
    def set_token(token: str, registry_url: str):
        import logging
        import base64
        logger = logging.getLogger(__name__)

        connection_name = "docker_default"
        conn_type = "docker"
        host = registry_url
        port = None
        user = base64.b64decode(token).decode().split(":")[0]
        password = base64.b64decode(token).decode().split(":")[1]
        schema = ""
        extra = ""

        session = settings.Session
        try:
            connection_query = session.query(Connection).filter(Connection.conn_id == connection_name)
            connection_query_result = connection_query.one_or_none()
            if not connection_query_result:
                connection = Connection(conn_id=connection_name, conn_type=conn_type, host=host, port=port,
                                        login=user, password=password, schema=schema, extra=extra)
                session.add(connection)
                session.commit()
            else:
                connection_query_result.host = host
                connection_query_result.login = user
                connection_query_result.schema = schema
                connection_query_result.port = port
                connection_query_result.extra = extra
                connection_query_result.set_password(password)
                session.add(connection_query_result)
                session.commit()

        except Exception as e:
            logger.info("Failed creating connection")
            logger.info(e)

    data = extract()
    set_token(data["token"], data["registry_url"])

refresh_docker_token_dag = refresh_docker_token_DAG()



References

jq

Pipe-ing

jq works on set of filters. Each filtering step produce a result which can be den further filtered

json='{"person": {"name": "Ivo", "phone": "123"}}'
echo $json | jq .person.name
"Ivo"

is the same as

echo $json | jq ".person | .name"
"Ivo"

because the first | will produce {name: ‘Ivo’} which will be then filtered.

Extract multiple fields

This is done by enumerating the fields with ,

Just do

echo $json | jq ".person | (.name,.phone)"
"Ivo"
"123"

But if you want to concatente them add echo $json | jq “.person | {name_with_phone:(.name + “-” + .phone)}” { “name_with_phone”: “Ivo-123” }

..and finally extract only the name_with_phone

echo $json | jq ".person | {name_with_phone:(.name + \"-\" + .phone)} | .name_with_phone" 
"Ivo-123"

..and to get only the value add -r echo $json | jq -r “.person | {name_with_phone:(.name + “-” + .phone)} | .name_with_phone” Ivo-123

Here is an example where I grab all the location ids from all parents

cat * | jq '.deviceLocationUpdate.location | .locationId,.parent.locationId,.parent.parent.locationId,.parent.parent.parent.locationId,.parent.parent.parent.parent.locationId' | sort | uniq

Calisthenic in programming

https://blog.avenuecode.com/object-calisthenics-principles-for-better-object-oriented-code

While the Object Calisthenics principles are:

  1. Use only one level of indentation per method
  2. Don’t use the else keyword
  3. Wrap all primitives and strings
  4. Use only one dot per line
  5. Don’t abbreviate
  6. Keep all entities small
  7. Don’t use any classes with more than two instance variables
  8. Use first-class collections
  9. Don’t use any getters/setters/properties

I am not sure why those rules should be considered calisthenic.

They should be considered a base foundation for coding. If you don’t do those, then you can’t even walk. They are not any kind of calisthenic.

Also, I am happy that the examples are in PHP which speaks that PHP is no more the swamp it was before.

Notify when some processes are finished

I have a case where I have to process thousands of files.

I have used the parallels program to run in batches but I don’t want to monitor and see when the process will be finished.

Here is what I used to see if there are some processes named “convert”

#!/bin/bash

number=`ps aux | grep convert | wc -l`
echo $number
if [ "$number" -eq "1" ]; then
	telegram-send "finished converting"
	sleep 60
fi

Then run this in some “screen”

watch -n 60 ./notify.sh

That way you will get a message on telegram every 60 seconds.

telegram-send can be installed with pip install telegram-send

Watch Youtube on the Background

Youtube app is very rude and does not allow you to watch videos in the background.

There was a nice solution with shortcuts on IOS but it occasionally stops working.

There are also a lot of bots in Telegram which can help you download videos, but none of those is easy to use and offers you to save bandwidth by choosing the quality/size of the stream.

That’s why I wrote a small golang telegram bot to help me.

Here is how it works.

Install those apps on your apple mobile

  • VLC – free open source app
  • Telegram – a popular chat program
  • Youtube app

Open Telegram and add/find the Bot with the name “Audio Helper (Youtube)”.

When you paste a link the audio helper bot will ask you about the quality of the audio and will ask offer you to open the link or to use “more options”.

Click “more options” and you can choose between “download in VLC” or “stream in VLC”

Next time you want to watch some video on a locked screen you can share the clip directly into the telegram bot.

Enjoy watching youtube videos on a locked screen.

Creating AWS enabled local spark

Install pyspark

We need to choose the spark version. it could be 2.4 or bigger. In our case it is 2.4.6.

The installation method is with conda:

conda install -c conda-forge pyspark=2.4.6

Install java

We need to have java. The right version for java. There is a problem with java 272 which comes with Amazon Linux 2. So we have to first remove that version and install the older version.

Query for the current installed openjdk:

rpm -qa | grep java
..you will see something like
java-1.8.0-openjdk-headless-1.8.0.272.b10-1.amzn2.0.1.x86_64
java-1.8.0-openjdk.x86_64 1:1.8.0.272.b10-1.amzn2.0.1
...then remove by
yum remove jdk1.8

Going for Java 265

yum -v list java-1.8.0-openjdk-headless  --show-duplicates
yum -v list java-1.8.0-openjdk  --show-duplicates
...
yum install java-1.8.0-openjdk-1.8.0.265.b01-1.amzn2.0.1
.. headless will be installed by the upper command.

Update alternatives

alternatives --config java

AWS Enable Local Spark

Check what version of hadoom-common you have

ls -l /opt/anaconda3/envs/advanced/lib/python2.7/site-packages/pyspark/jars/hadoop*
....
hadoop-common-2.7.3.jar
...

That means that we have to stick to aws sdk for hadoop 2.7.3 Download hadoop-aws-2.7.3.jar and its dependency aws-java-sdk-1.7.4.jar. Great tutorial found here

So the final code to get the spark running is

def create_local_spark():
    jars = [
        "/opt/jars/hadoop-lzo-0.4.21-SNAPSHOT.jar",
        "/opt/jars/aopalliance-1.0.jar",
        "/opt/jars/bcprov-jdk15on-1.51.jar",
        "/opt/jars/ion-java-1.0.2.jar",
        "/opt/jars/jcl-over-slf4j-1.7.21.jar",
        "/opt/jars/slf4j-api-1.7.21.jar",
        "/opt/jars/bcpkix-jdk15on-1.51.jar",
        "/opt/jars/emrfs-hadoop-assembly-2.19.0.jar",
        "/opt/jars/javax.inject-1.jar",
        "/opt/jars/jmespath-java-1.11.129.jar",
        "/opt/jars/s3-dist-cp-2.7.0.jar",
        "/opt/jars/s3-dist-cp.jar",
        "/opt/jars/mysql-connector-java-5.1.39.jar",
    ]

    aws_1 = [
        "/opt/jars/hadoop-aws-2.7.3.jar",
        "/opt/jars/aws-java-sdk-1.7.4.jar",
    ]

    jars_string = ",".join(jars + aws_1)
    pyspark_shell = "--jars {} --driver-memory 4G pyspark-shell".format(jars_string)

    os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_shell
    os.environ["PYSPARK_PYTHON"] = "/opt/anaconda3/envs/advanced/bin/python"

    spark_session = SparkSession.builder.appName("ZZZZZ").getOrCreate()
    hadoop_conf = spark_session._jsc.hadoopConfiguration()

    hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
    hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoop_conf.set("fs.s3a.server-side-encryption-algorithm", "AES256")
    hadoop_conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    hadoop_conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")

    spark_context = spark_session.sparkContext
    sql_context = SQLContext(spark_context)
    # df = spark_session.read.json("s3a://hello/world/")
    return spark_context, sql_context

Bookmarks January 2021

Continue reading

Backup whole hard disk with squashfs

A great blog post showing how to backup an ssd with squashfs and dd.
Here is a pdf version

Rails 6.1 and Webpack

Continue reading

Python meta classes

Continue reading

© 2021 Gudasoft

Theme by Anders NorénUp ↑