PYTHONCODEPYTHON CODEDATA MESH
10/02/2023 • Thomas Eeckhout

How to generate and transform data in Python

Data transformation and generating data from other data are common tasks in software development. Different programming languages have different ways to achieve this, each with their strengths and weaknesses. Depending on the problem, some may be more preferable than others. In this blog, you find simple but powerful methods for generating and transforming data in Python.

Before we discuss a more complex case, let’s start with a basic example. Imagine that we own a few stores and each store has its own database with items added by employees. Some fields are optional, which means employees do not always fill out everything. As we grow, it might become difficult to get a clear view of all the items in our stores. Therefore, we develop a Python script that takes the different items from our stores’ databases and collects them in a single unified database.

from stores import store_1, store_2, store_3

# Typehints are used throughout the code.
items_1: Generator[Item, None, None] = store_1.get_items()
items_2: Generator[Item, None, None] = store_2.get_items()
items_3: Generator[Item, None, None] = store_3.get_items()

Generators

store_1.get_items() returns a generator of items. Generators will have a key role in this blogpost. With them, we can set up a complex chain of transformations over massive amounts of data without running out of memory, while  keeping our code concise and clean. If you are not familiar with Python yet:

def a_generator():
  for something in some_iterable:
    # do logic
    yield something

Two things are important here. First, calling a generator will not return any data; it will return an iterator. Second, values are produced on demand. A more in-depth explanation can be found here.

Syntax

There are two ways to create generators. The first looks like a normal Python function, but has a yield statement instead of a return statement. The other is more concise but can quickly become convoluted as the logic gets more complex. It’s called the Python generator expression syntax and is mainly used for simpler generators.

# Basic generator syntax
def generate_until(n: int) -> Generator[int, None, None]:
    while i > n;
        yield i
        i += 1

# Generator expression syntax
gen_until_5: Generator[int, None, None] = (i for i in range(5))

Code

To keep it simple, we run the script once at the end of the day, leaving us with a complete database with all items from all stores.

from stores import store_1, store_2, store_3

from database import all_items

# Typehints are used throughout the code.
items_1: Generator[Item, None, None] = store_1.get_items()
items_2: Generator[Item, None, None] = store_2.get_items()
items_3: Generator[Item, None, None] = store_3.get_items()
  
# Let's assume our `add_or_update()` function accepts generators.
# If an Item already exists it updates it else it adds it to the database.
# We can just add them one by one like here.
all_items.add_or_update(items_1)
all_items.add_or_update(items_2)
all_items.add_or_update(items_3)

# The database now contains all the latest items from all the stores.

For this use case, this is perfectly fine. But when the complexity grows and more stores are added, it can quickly become cluttered. Fortunately, Python has great built-in tools to simplify our code.

Itertools

One module in Python is called itertools. According to the Python docs, “the module standardizes a core set of fast, memory-efficient tools that are useful by themselves or in combination. Together, they form an “iterator algebra”, making it possible to construct specialized tools succinctly and efficiently in pure Python.

A great function is itertools.chain(). This is used to ‘chain’ together multiple iterables as if they are one. We can use it to chain our generators together.

from stores import store_1, store_2, store_3
from database import all_items

from itertools import chain

# Typehints are used throughout the code.
items_1: Generator[Item, None, None] = store_1.get_items()
items_2: Generator[Item, None, None] = store_2.get_items()
items_3: Generator[Item, None, None] = store_3.get_items()
  
# Using itertools.chain we can add the generators together into one.
# Chain itself is also a generator function so no data will be generated yet.
items: Generator[Item, None, None] = chain(items_1, items_2, items_3)
 
all_items.add_or_update(items) # <- data will be generated here

# The database now contains all the latest items from all the stores

Genertator Functions

Now let’s assume that our item is a tuple with five fields: name, brand, supplier, cost, and the number of pieces in the store. It has the following signature: tuple[str,str,str,int,int]. If we want the total value of the items in the store, we simply need to multiply the number of articles by the cost.

# both receives and returns a generator
def calc_total_val(items: Generator) -> Generator:
    for item in items:
        # yield the first 3 items and the product of the last 2
        yield *item[:3], item[3]*item[4]

# we can also write is as a generator expression since it's so simple
((*item[:3], item[3]*item[4]) for item in items)

Now it looks like this: tuple[str, str, str, int]. But we want to output it as JSON. For that, we can just create a generator that returns a dictionary and call json.dumps() on it. Let’s assume that we can pass an iterator of dicts to the add_or_update() function and that it automatically calls json.dumps().

# both receives and returns a generator
def as_dict_item(items: Generator) -> Generator:
    for item in items:
        yield {
            "name": item[0],
            "brand": item[1],
            "supplier": item[2],
            "total_value": item[3],
        }

Now we have more logic, let’s see how we can put it together. One great thing about generators is how clear and concise it is to use them. We can create a function for each process step and run the data through it.


from stores import store_1, store_2, store_3
from database import all_items

from itertools import chain

def calc_total_val(items):
    for item in items:
        yield *item[:3], item[3]*item[4]

def as_item_dict(items):
    for item in items:
        yield {
            "name": item[0],
            "brand": item[1],
            "supplier": item[2],
            "total_value": item[3],
        }

items_1 = store_1.get_items()
items_2 = store_2.get_items()
items_3 = store_3.get_items()
  

items = chain(items_1, items_2, items_3) # <- make one big iterable
items = calc_total_val(items) # <- calc the total value
items = as_item_dict(items) # <- transform it into a dict
 
all_items.add_or_update(items) # <- data will be generated here

# The database now contains all the latest items from all the stores

To show the steps that we have taken, I split everything up. There are still some things that could be improved. Take a look at the function calc_total_val(). This is a perfect example of a situation where a generator expression can be used.

from stores import store_1, store_2, store_3
from database import all_items
from itertools import chain

def as_item_dict(items):
    for item in items:
        yield {
            "name": item[0],
            "brand": item[1],
            "supplier": item[2],
            "total_value": item[3],
        }

items_1 = store_1.get_items()
items_2 = store_2.get_items()
items_3 = store_3.get_items()
  
items = chain(items_1, items_2, items_3)
items = ((*item[:3], item[3]*item[4]) for item in items)
items = as_item_dict(items)
 
all_items.add_or_update(items)

To make it even cleaner, we can put all of our functions into a separate module. In this way, our main file only contains the steps the data goes through. If we use descriptive names for our generators, we can immediately see what the code will do. So now we have created a pipeline for the data. While this is only a simple example, it can also be used for more complicated workflows.

Data products

Everything we did in the example above can be easily applied to a Data Product. If you are not familiar with data products, here is a great text on data meshes.


Imagine that we have a data product that does some data aggregation. It has multiple inputs with different kinds of data. Each of those inputs needs to be filtered, transformed and cleaned before we can aggregate them into one output. The client requires the output to be a single JSON file stored in an S3 bucket. The existing infrastructure only allows for 500 Mb of RAM for the containers. Now let’s load all the data, do some transformations, aggregate everything, and parse it into a JSON file.

from input_ports import port_1, port_2
from output_ports import S3_port
from json import dumps

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

output = []
for row in data_port_1:
    # do some transformation or filtering here
    output.append(row)

for row in data_port_2:
    # do some transformation or filtering here
    output.append(row)

S3_port.save(dumps(output))

While this looks like an excellent solution that does the job and is easy to understand, our container suddenly crashes due to an OutOfMemory error. After some local testing on our machine, we see that it has produced an 834Mb file that cannot work with only 500 MB of RAM for the container. The problem with the code above is that we keep everything in a list first, so all is saved in memory.

Solution

Let’s give it another try. For S3, we can use MultipartUpload. This means we do not need to keep the entire file in memory. And of course, we should replace our lists by generators.

from input_ports import port_1, port_2
from output_ports import S3_port
from itertools import chain
from json import dumps

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

def port_1_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

def port_2_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

output = chain(port_1_transformer(data_port_1), port_2_transformer(data_port_2))
for part in output:
  S3_port.save_part(dumps(part))

Since we now only have one item in memory at a time, this uses dramatically less memory than the earlier solution with almost no extra work. However, sending a post request to S3 for each item might be a bit much. Especially if we have 300,000 items. But there is another issue …

The ‘part size’ should be between 5MiB and 5GiB. To fix this, we can group multiple parts before we parse them. But if we group too many, we will once again reach the memory limit. The chunk size should therefore depend on how large the individual parts of your data are. To demonstrate this, let’s use a size of 1,000. The larger the chunk size, the more memory is used but the fewer requests to S3. So we prefer our chunks to be as large as possible without running out of memory.

from input_ports import port_1, port_2
from output_ports import S3_port
from itertools import chain
from json import dumps

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

def makebatch(iterable, len):
  for first in iterable:
    yield chain([first], islice(iterable, len - 1))
        
def port_1_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

def port_2_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

output = chain(port_1_transformer(data_port_1), port_2_transformer(data_port_2))
for chunk in makebatch(output, 1000):
  S3_port.save_part(dumps(chunk))

This is all that needs to happen. It is enough to transform vast amounts of data and save them in an S3 bucket, even when resources are scarce.

Bonus


When your calculations are compute-intensive, running them in parallel is easy. With just a few extra lines, we can run our transformers with multiple cores.

from multiprocessing.pool import Pool

with Pool(4) as pool:
  # imap_unordered could also be used if the order is not important
  data_1 = pool.imap(port_1_transformer, data_port_1, chunksize=500)
  data_2 = pool.imap(port_2_transformer, data_port_2, chunksize=500)
  
  output = chain(data_1, data_2)
  

The best part about this? We do not have to change anything else as imap can be iterated to get results, just like any other generator. Now let’s throw it all together. This is all we need for compute-intensive transformations, over large amounts of data, using multiple cores.

from input_ports import port_1, port_2
from output_ports import S3_port
from itertools import chain
from json import dumps
from multiprocessing.pool import Pool

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

def makebatch(iterable, len):
  for first in iterable:
    yield chain([first], islice(iterable, len - 1))
        
def port_1_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

def port_2_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

with Pool(4) as pool:
  # imap_unordered could also be used if the order is not important
  data_1 = pool.imap(port_1_transformer, data_port_1, chunksize=500)
  data_2 = pool.imap(port_2_transformer, data_port_2, chunksize=500)
  
  output = chain(data_1, data_2)

  for chunk in makebatch(output, 1000):
    S3_port.save_part(dumps(chunk))

Conclusion

Generators are often misunderstood by new developers, but they can be an excellent tool. Whether for a simple transformation or something more advanced such as a data product, Python is a great choice because of its ease of use and the abundance of tools available within the standard library.