As a data engineer, you’re often dealing with large amounts of data coming from various sources and have to make sense of them. Extracting, Transforming, and Loading (ETL) data to get it where it needs to go is part of your job, and it can be a tough one when there’s so many moving parts.

Fortunately, Luigi is here to help. An open source project developed by Spotify, Luigi helps you build complex pipelines of batch jobs. It has been used to automate Spotify’s data processing (billions of log messages and terabytes of data), as well as in other well-known companies such as Foursquare, Stripe, Asana, and Red Hat.

Luigi handles dependency resolution, workflow management, and helps you recover from failures gracefully. It can be integrated into the services you already use, such as running a Spark job, dumping a table from a database, or running a Python snippet.

In this example, you’ll learn how to ingest log files, run some transformations and calculations, then store the results.

Let’s get started.

Prerequisites

For this tutorial, we’re going to be using Python 3.6.3, Luigi, and this fake log generator. Although you could use your own production log files, not everyone has access to those. So we have a tool that can produce dummy data for us while we’re testing. Then you can try it out on the real thing.

To start, make sure you have Python 3, pip, and virtualenv installed. Then we’ll set up our environment and install the luigi package:

 

This sets up an isolated Python environment and installs the necessary dependencies. Next, we’ll need to obtain some test data to use in our data pipeline.

Generate test data

Since this example deals with log files, we can use this Fake Apache Log Generator to generate dummy data easily. If you have your own files you would like to use, that is fine, but we will be using mocked data for the purposes of this tutorial.

To use the log generator, clone the repository into your project directory. Then navigate into the Fake-Apache-Log-Generator folder and use pip to install the dependencies:

After that completes, it’s pretty simple to generate a log file. You can find detailed options in the documentation, but let’s generate a 1000-line .log file:

This creates a file called access_log_{timestamp}.log.

Now we can set up a workflow in Luigi to analyze this file.

Luigi Tasks

You can think of building a Luigi workflow as similar to building a Makefile. There are a series of Tasks and dependencies that chain together to create your workflow.

Let’s say we want to figure out how many unique IPs have visited our website. There are a couple of steps we need to go through to do this:

  • Read the log file
  • Pull out the IP address from each line
  • Decide how many unique IPs there are (de-duplicate them)
  • Persist that count somewhere

Each of those can be packaged as a Luigi Task. Each Task has three component parts:

  • run() — This contains the logic of your Task. Whether you’re submitting a Spark job, running a Python script, or querying a database, this function contains the actual execution logic. We break up our process, as above, into small chunks so we can run them in a modular fashion.
  • output() — This defines where the results will go. You can write to a file, update HDFS, or add new records in a database. Luigi provides multiple output targets for you to use, or you can create your own.
  • requires() — This is where the magic happens. You can define dependencies of your task here. For instance, you may need to make sure a certain dataset is updated, or wait for another task to finish first. In our example, we need to de-duplicate the IPs before we can get an accurate count.

Our first Task just needs to read the file. Since it is the first task in our pipeline, we don’t need a requires() function or a run() function. All this first task does is send the file along for processing. For the purposes of this tutorial, we’ll write the results to a local file. You can find more information on how to connect to databases, HDFS, S3, and more in the documentation.

To build a Task in Luigi, it’s just a simple Python class:

 

So that’s a fairly boring task. All it does is grab the file and send it along to the next Task for processing. Let’s write that next Task now, which will pull out the IPs and put them in a list, throwing out any duplicates:

 

Even though this Task is a little more complicated, you can still see that it’s built on three component parts: requires, run, and output. It pulls in the data, splits it up, then adds the IPs to a list. Then it counts the number of elements in that list and writes that to a file.

If you try and run the program now, nothing will happen. That’s because we haven’t told Luigi to actually start running these tasks yet. We do that by calling luigi.run (you can also run Luigi from the command line):

 

The run function takes two arguments: a list of options to pass to Luigi and the task you want to start on. While you’re testing, it helps to pass the –local-scheduler option; this allows the processes to run locally. When you’re ready to move things into production, you’ll use the Central Scheduler. It provides a web interface for managing your workflows and dependency graphs.

If you run your Python file at this point, the Luigi process will kick off and you’ll see the process of each task as it moves through the pipeline. At the end, you’ll see which tasks succeeded, which failed, and the status of the run overall (Luigi gives it a 🙂 or 🙁 face).

Check out numips.txt to see the result of this run. In my example, it returned the number 1000. That means that 1000 unique IP addresses visited the site in this dummy log file. Try it on one of your logs and see what you get!

Extending your ETL

While this is a fairly simple example, there are a lot of ways that you could easily extend this. You could:

  • Split out dates and times as well, and filter the logs to see how many visitors there were on a certain day
  • Geolocate the IPs to find out where your visitors are coming from
  • See how many errors (404s) your visitors are encountering
  • Put the data in a more robust data store
  • Build scheduling so that you can run the workflow periodically on the updated logs

Try some of these options out and let us know if you have any questions! Although this example is limited in scope, Luigi is robust enough to handle Spotify-scale datasets, so let your imagination go wild.

The following two tabs change content below.
mm

Al Nelson

Al is a geek about all things tech. He's a professional technical writer and software developer who loves writing for tech businesses and cultivating happy users. You can find him on the web at http://www.alnelsonwrites.com or on Twitter as @musegarden.

Comments

comments