PYTHONCODEPYTHON CODEDATA MESH
10/02/2023 • Thomas Eeckhout

Hoe gegevens genereren en transformeren in Python

In softwareontwikkeling zijn datatransformatie en het genereren van gegevens uit andere gegevens veelvoorkomende taken. Alle programmeertalen pakken dit op een andere manier aan, elk met hun eigen troeven en minpunten. Afhankelijk van het probleem, zijn sommige manieren meer aangewezen dan andere. In deze blog ontdek je eenvoudige maar toch krachtige methodes om gegevens te genereren en te transformeren in Python.

Voor we een complexer geval bespreken, gaan we van start met een standaardvoorbeeld. Stel dat we een paar winkels hebben en dat elke winkel zijn eigen database heeft met items die door werknemers werden toegevoegd. Sommige velden zijn optioneel, wat betekent dat de werknemers niet altijd alles invullen. Naarmate we groeien, kan het moeilijk worden om een duidelijk beeld te hebben van alle items in onze winkels. Daarom ontwikkelen we een Python-script dat de verschillende items uit de databases van onze winkels haalt en ze verzamelt in een enkele gemeenschappelijke database.

from stores import store_1, store_2, store_3

# Typehints are used throughout the code.
items_1: Generator[Item, None, None] = store_1.get_items()
items_2: Generator[Item, None, None] = store_2.get_items()
items_3: Generator[Item, None, None] = store_3.get_items()

Generatoren

Store_1.get_items() resulteert in een generator van items. Generatoren zullen een hoofdrol spelen in deze blog.
We kunnen er een complexe keten van transformaties van enorme hoeveelheden gegevens mee opzetten zonder dat het geheugen opraakt, terwijl onze code beknopt en netjes blijft. Ben je nog niet vertrouwd met Python?

def a_generator():
  for something in some_iterable:
    # do logic
    yield something

Twee dingen zijn hierbij belangrijk. Ten eerste zal het oproepen van een generator geen gegevens opleveren, maar een iterator. Ten tweede worden waarden op verzoek geproduceerd. Hier vind je een gedetailleerde uitleg.

Syntaxis


Er zijn twee manieren om generatoren aan te maken. De eerste manier lijkt op een normale Python-functie, maar heeft een yield statement in plaats van een return statement. De andere is weliswaar beknopter, maar kan snel ingewikkeld worden naarmate de logica complexer wordt. Dit wordt de expressiesyntaxis van de Python-generator genoemd en wordt voornamelijk gebruikt voor eenvoudigere generatoren.

# Basic generator syntax
def generate_until(n: int) -> Generator[int, None, None]:
    while i > n;
        yield i
        i += 1

# Generator expression syntax
gen_until_5: Generator[int, None, None] = (i for i in range(5))

Code

Om het simpel te houden, voeren we het script één keer uit aan het eind van de dag, met als resultaat een volledige database met alle items van alle winkels.

from stores import store_1, store_2, store_3

from database import all_items

# Typehints are used throughout the code.
items_1: Generator[Item, None, None] = store_1.get_items()
items_2: Generator[Item, None, None] = store_2.get_items()
items_3: Generator[Item, None, None] = store_3.get_items()
  
# Let's assume our `add_or_update()` function accepts generators.
# If an Item already exists it updates it else it adds it to the database.
# We can just add them one by one like here.
all_items.add_or_update(items_1)
all_items.add_or_update(items_2)
all_items.add_or_update(items_3)

# The database now contains all the latest items from all the stores.

Voor dit voorbeeld werkt dat perfect. Maar als de complexiteit groeit en er meer winkels worden toegevoegd, kan het snel onoverzichtelijk worden. Gelukkig beschikt Python over fantastische geïntegreerde tools om onze code te vereenvoudigen.

Itertools


Een van die modules in Python is itertools. Volgens de Python-documentatie “standaardiseert deze module een basispakket van snelle, geheugenefficiënte tools die afzonderlijk of in combinatie handig zijn. Samen vormen ze een ‘iterator-algebra’ die het mogelijk maakt om gespecialiseerde tools beknopt en efficiënt te bouwen in pure-Python.

itertools.chain() is een geweldige functie. Ze wordt gebruikt om verschillende iterabelen aan elkaar te ‘ketenen’ alsof ze één zijn. We kunnen er onze generatoren mee aan elkaar ketenen.

from stores import store_1, store_2, store_3
from database import all_items

from itertools import chain

# Typehints are used throughout the code.
items_1: Generator[Item, None, None] = store_1.get_items()
items_2: Generator[Item, None, None] = store_2.get_items()
items_3: Generator[Item, None, None] = store_3.get_items()
  
# Using itertools.chain we can add the generators together into one.
# Chain itself is also a generator function so no data will be generated yet.
items: Generator[Item, None, None] = chain(items_1, items_2, items_3)
 
all_items.add_or_update(items) # <- data will be generated here

# The database now contains all the latest items from all the stores

Generatorfuncties

Laten we ervan uitgaan dat ons item een tuple is met vijf velden: naam, merk, leverancier, kostprijs en het aantal stuks in de winkel. Het heeft de volgende signature: tuple[str,str,str,int,int]. Als we de totale waarde willen weten van de items in de winkel, moeten we alleen maar het aantal items vermenigvuldigen met de kostprijs.

# both receives and returns a generator
def calc_total_val(items: Generator) -> Generator:
    for item in items:
        # yield the first 3 items and the product of the last 2
        yield *item[:3], item[3]*item[4]

# we can also write is as a generator expression since it's so simple
((*item[:3], item[3]*item[4]) for item in items)

Nu ziet het er zo uit: tuple[str, str, str, int]. Maar we willen het uitvoeren als JSON. Om dat te doen, kunnen we gewoon een generator aanmaken die een woordenboek weergeeft en daar json.dumps() op oproepen. We gaan ervan uit dat we een iterator van dicts kunnen doorgeven aan de functie add_or_update() en dat deze automatisch json.dumps() oproept.

# both receives and returns a generator
def as_dict_item(items: Generator) -> Generator:
    for item in items:
        yield {
            "name": item[0],
            "brand": item[1],
            "supplier": item[2],
            "total_value": item[3],
        }

Nu we meer logica hebben, gaan we alles samenvoegen. Een groot voordeel van generatoren is hoe duidelijk en overzichtelijk het is om ze te gebruiken. We kunnen een functie creëren voor elke processtap en de gegevens erdoor halen.


from stores import store_1, store_2, store_3
from database import all_items

from itertools import chain

def calc_total_val(items):
    for item in items:
        yield *item[:3], item[3]*item[4]

def as_item_dict(items):
    for item in items:
        yield {
            "name": item[0],
            "brand": item[1],
            "supplier": item[2],
            "total_value": item[3],
        }

items_1 = store_1.get_items()
items_2 = store_2.get_items()
items_3 = store_3.get_items()
  

items = chain(items_1, items_2, items_3) # <- make one big iterable
items = calc_total_val(items) # <- calc the total value
items = as_item_dict(items) # <- transform it into a dict
 
all_items.add_or_update(items) # <- data will be generated here

# The database now contains all the latest items from all the stores

Om de stappen te tonen die we hebben doorlopen, heb ik alles opgesplitst. Er zijn nog enkele zaken die voor verbetering vatbaar zijn. Kijk eens naar de functie calc_total_val(). Dit is een perfect voorbeeld van een situatie waarin een generatorexpressie kan worden gebruikt.

from stores import store_1, store_2, store_3
from database import all_items
from itertools import chain

def as_item_dict(items):
    for item in items:
        yield {
            "name": item[0],
            "brand": item[1],
            "supplier": item[2],
            "total_value": item[3],
        }

items_1 = store_1.get_items()
items_2 = store_2.get_items()
items_3 = store_3.get_items()
  
items = chain(items_1, items_2, items_3)
items = ((*item[:3], item[3]*item[4]) for item in items)
items = as_item_dict(items)
 
all_items.add_or_update(items)

Om het nog overzichtelijker te maken, kunnen we al onze functies in een aparte module stoppen. Op die manier bevat ons hoofdbestand enkel de stappen die de gegevens doorlopen. Als we beschrijvende namen gebruiken voor onze generatoren kunnen we meteen zien wat de code doet. Nu hebben we dus een pijplijn gemaakt voor de gegevens. Hoewel dit maar een eenvoudig voorbeeld is, kan het ook worden toegepast voor complexere workflows.

Gegevensproducten

Alles wat we in het bovenstaande voorbeeld hebben gedaan, kan gemakkelijk worden toegepast op een gegevensproduct. Als je niet vertrouwd bent met gegevensproducten, vind je hier een interessante tekst over data meshes.

Stel dat we een gegevensproduct hebben dat gegevens samenvoegt. Het heeft meerdere inputs met verschillende soorten gegevens. Elk van die inputs moet gefilterd, getransformeerd en opgeschoond worden voordat we ze kunnen samenvoegen tot één output. De client vereist dat de output een enkel JSON-bestand is dat wordt opgeslagen in een S3 bucket. De bestaande infrastructuur staat alleen 500 Mb RAM toe voor de containers. Laten we nu alle gegevens laden, enkele transformaties doen, alles samenvoegen en parsen in een JSON-bestand.

from input_ports import port_1, port_2
from output_ports import S3_port
from json import dumps

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

output = []
for row in data_port_1:
    # do some transformation or filtering here
    output.append(row)

for row in data_port_2:
    # do some transformation or filtering here
    output.append(row)

S3_port.save(dumps(output))

Hoewel dit een uitstekende oplossing lijkt die de klus klaart en makkelijk te begrijpen is, crasht onze container plots door een OutOfMemory-fout. Na enkele lokale tests op onze machine, zien we dat het een bestand van 834 Mb heeft opgeleverd, terwijl de container slechts 500 Mb RAM toestaat. Het probleem met de bovenstaande code is dat we alles eerst in een lijst bewaren, zodat alles in het geheugen wordt opgeslagen.

Oplossing

We proberen het nog eens. Voor S3 kunnen we MultipartUpload gebruiken. Dat betekent dat we het volledige bestand niet in het geheugen moeten bewaren. En uiteraard moeten we onze lijsten vervangen door generatoren.

from input_ports import port_1, port_2
from output_ports import S3_port
from itertools import chain
from json import dumps

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

def port_1_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

def port_2_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

output = chain(port_1_transformer(data_port_1), port_2_transformer(data_port_2))
for part in output:
  S3_port.save_part(dumps(part))

Omdat we nu maar één item tegelijk in het geheugen hebben, neemt dit veel minder geheugen in dan de eerdere oplossing, met quasi geen extra werk. Maar een post request naar S3 sturen voor elk item is misschien wat overdreven. Zeker als we 300.000 items hebben. Maar er is nog een probleem ...

De ‘part size’ moet tussen 5 MiB en 5 GiB liggen. Om dit op te lossen, kunnen we verschillende onderdelen groeperen voor we ze parsen. Groeperen we er echter te veel, dan botsen we opnieuw op de geheugenlimiet. De chunkgrootte moet daarom afhangen van hoe groot de afzonderlijke delen van je gegevens zijn. Laten we om dit te verduidelijken een grootte van 1.000 nemen. Hoe groter de chunkgrootte, hoe meer geheugen er wordt gebruikt, maar hoe minder verzoeken er naar S3 worden gestuurd.
We geven er dus de voorkeur aan om onze chunks zo groot mogelijk te houden, zonder dat het geheugen opraakt.

from input_ports import port_1, port_2
from output_ports import S3_port
from itertools import chain
from json import dumps

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

def makebatch(iterable, len):
  for first in iterable:
    yield chain([first], islice(iterable, len - 1))
        
def port_1_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

def port_2_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

output = chain(port_1_transformer(data_port_1), port_2_transformer(data_port_2))
for chunk in makebatch(output, 1000):
  S3_port.save_part(dumps(chunk))

Dat is alles wat er moet gebeuren. Het volstaat om grote hoeveelheden gegevens te transformeren en op te slaan in een S3 bucket, zelfs als de resources schaars zijn.

Bonus


Als je berekeningen rekenintensief zijn, is het makkelijk om ze parallel uit te voeren. Met slechts enkele extra regels kunnen we onze transformatoren uitvoeren met meerdere cores.

from multiprocessing.pool import Pool

with Pool(4) as pool:
  # imap_unordered could also be used if the order is not important
  data_1 = pool.imap(port_1_transformer, data_port_1, chunksize=500)
  data_2 = pool.imap(port_2_transformer, data_port_2, chunksize=500)
  
  output = chain(data_1, data_2)
  

Het mooiste hieraan? We moeten verder niets veranderen, want imap kan net als elke andere generator worden geïtereerd om resultaten te verkrijgen. Laten we nu alles samengooien. Dat is alles wat we nodig hebben voor rekenintensieve transformaties van grote hoeveelheden gegevens, met gebruik van meerdere cores.

from input_ports import port_1, port_2
from output_ports import S3_port
from itertools import chain
from json import dumps
from multiprocessing.pool import Pool

data_port_1: Generator = port_1.get_data()
data_port_2: Generator = port_2.get_data()

def makebatch(iterable, len):
  for first in iterable:
    yield chain([first], islice(iterable, len - 1))
        
def port_1_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

def port_2_transformer(data: Generator):
  for row in data:
    # do some transformation or filtering here
    yield row

with Pool(4) as pool:
  # imap_unordered could also be used if the order is not important
  data_1 = pool.imap(port_1_transformer, data_port_1, chunksize=500)
  data_2 = pool.imap(port_2_transformer, data_port_2, chunksize=500)
  
  output = chain(data_1, data_2)

  for chunk in makebatch(output, 1000):
    S3_port.save_part(dumps(chunk))

Conclusie


Generatoren worden vaak verkeerd begrepen door beginnende ontwikkelaars, maar ze kunnen een interessante tool vormen. Of het nu voor een eenvoudige transformatie of iets meer geavanceerds is zoals een gegevensproduct, Python is een uitstekende keuze omwille van het gebruiksgemak en de overvloed aan tools die beschikbaar zijn in de standaardbibliotheek.