Fermer

juillet 15, 2025

De 48 heures à 6 minutes: mon voyage Optimisant un processus de réconciliation des pandas pour les données à grande échelle

De 48 heures à 6 minutes: mon voyage Optimisant un processus de réconciliation des pandas pour les données à grande échelle


Introduction

J’ai récemment traversé une conduite sauvage mais gratifiante pour optimiser un flux de travail de réconciliation basé sur les pandas. Ce qui a commencé comme un morceau de code lent et maladroit mangeant 48 heuresa fini par être poli dans un maigre et méchant 6 minutes machine.
Je voulais partager mon histoire – non pas comme un tutoriel, mais comme une étude de cas du monde réel des tranchées. Si vous avez déjà senti que votre code Pandas prenait une éternité, ce post est pour vous.

Le problème: réconcilier d’énormesframes de données

Je travaillais avec deux grands pandas dataframes – appelons-les DF1 et DF2, et pour concilier dans deux catégories:

  1. Match exact
  2. Match basé sur la tolérance

Les clés de réconciliation étaient principalement PAN, Tan, Taxable_Value, TDS et Date, mais pour la deuxième catégorie, Taxable_Value & TDS correspond à une fenêtre de tolérance – pas une correspondance exacte. La tolérance aux dates était applicable dans les deux catégories (c’est-à-dire pas une correspondance d’égalité stricte à la date)

Voici à quoi cela ressemblait:

  • catégorie_1: groupé par [PAN, TAN, TAXABLE_VALUE, TDS]
  • catégorie_2: groupé par juste [PAN, TAN]puisque nous voulions plus de flexibilité avec Taxable_value & TDS Tolérance

Un aperçu de la complexité des données

Les données n’étaient pas seulement grandes – elle était désordonnée et déséquilibrée, ce qui a rendu le problème plus difficile:

  • 3,9 millions de lignes dans un dataframe (DF1) et 770 000 lignes dans l’autre DataFrame (DF2)
  • Entrées en double des deux côtés pour les mêmes combinaisons de pan
  • Plusieurs bracs sous une seule casserole, pas toujours alignés entre DF1 et DF2
  • Distribution asymétrique – Certains groupes pan-tan avaient des milliers de rangées, d’autres n’en avaient que quelques-uns

Cela a entraîné des explosions combinatoires lors de la correspondance des groupes en double – en particulier pendant la réconciliation basée sur la tolérance – et a rendu le multiprocessement naïf très inefficace.
Vous trouverez ci-dessous les échantillons de l’apparence des données réelles-

df1-

POÊLETANNÉTaxable_valueTDSdate
ABCDE1234FTan00110001002023-04-01
ABCDE1234FTan00110001002023-04-01
ABCDE1234FTan00220002002023-04-05
ABCDE1234FTan00230003002023-04-10

df2-

POÊLETANNÉTaxable_valueTDSdate
ABCDE1234FTan00110001002023-04-02
ABCDE1234FTan0019991012023-04-03
ABCDE1234FTan00219951982023-04-06
ABCDE1234FTan00440004002023-04-11

Première tentative

L’implémentation initiale avait deux méthodes distinctes, une pour chaque catégorie. Chacun s’est déroulé indépendamment dans sa propre piscine multiprocesseur. Je comptais fortement sur .Apply (lambda…) pour la logique comme la tolérance à la date, et la performance était… disons simplement, pas idéal.

def complex_check(row, category):
    .... .... ....
    time_delta = (dt.strptime(row[DATE_X], "%Y-%m-%d") - dt.strptime(row[DATE_Y], "%Y-%m-%d")).days

    if not time_delta or LOWER_BOUND <= time_delta <= UPPER_BOUND:
        return category
    return "X"
# Here, data and df3 are derived from df1 and df2, respectively.
.... .... ....
data.set_index(data[KEY_INDEX], inplace=True)
df3.set_index(df3[KEY_INDEX], inplace=True)
df3 = pd.merge(data, df3, left_index=True, right_index=True)
if df3.empty:
    continue
.... ... ....
df3[CATEGORY] = df3.apply(lambda record: complex_check(record, category), axis=1)
.... .... ....

Le parallélisme de la tâche est entraîné par les valeurs uniques dans la colonne KEY_INDEX de DF1. Explorons cela plus loin ci-dessous.

.... .... ....
df1.set_index(df1[KEY_INDEX], inplace=True)
df2.set_index(df2[KEY_INDEX], inplace=True)
jobs = []
with Pool(processes=ie.n_processes) as pool:
    for key_index in df1[KEY_INDEX].unique():
        data = df1[df1[KEY_INDEX] == key_index]
        df3 = df2[df2[KEY_INDEX] == key_index]
        if not data.empty and not df3.empty:
            jobs.append(pool.apply_async(partial(func, data, df3, category, ... .... ....)))
if jobs:
    df = pd.concat([job.get() for job in jobs])
.... .... ....
.... .... ....
Execution Time:
Category 1: 46.53 hours
Category 2: 1.40 hours
Diagramme de flux d'exécution: avant l'optimisation

Diagramme de flux d’exécution: avant l’optimisation

Refactorisation et optimisations

Je savais que je devais résoudre ce problème. J’ai donc adopté une approche progressive, en décomposant les goulots d’étranglement et en les plaquant un par un.

Diagramme de flux d'exécution: après optimisation

Diagramme de flux d’exécution: après optimisation

Cas 1: fusion des flux de travail parallèles

Problème: J’utilisais les deux catégories séparément, le duplication des efforts et le gaspillage des ressources.
Réparer: Au lieu de paralléliser les catégories, j’ai parallélisé chaque combinaison pan-tan, puis j’ai géré les deux catégories ensemble dans chaque processus.

# Here, we are determining how many tasks will be processed in parallel for the chunks of PAN-TAN.
.... .... ....
groups_df1 = dict(list(df1.groupby([PAN, TAN])))
groups_df2 = dict(list(df2.groupby([PAN, TAN])))
common_keys = groups_df1.keys() & groups_df2.keys()
max_workers = int(os.cpu_count() * 0.85)

manager = multiprocessing.Manager()
shared_list = manager.list()

grouped_task = [
    (shared_list, key, groups_df1.pop(key), groups_df2.pop(key), ..., ..., ....) for key in common_keys
]
with ProcessPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_pan_tan_chunk, grouped_task))

if results:
    .... .... .... 
    .... .... ....
del results, grouped_task, common_keys, groups_df1, groups_df2
.... .... ....
Result:
Execution time dropped from 46+ hours to 15 hours.
Not amazing, but hey — progress!

Cas 2: Ditching .Apply () pour la vectorisation

Problème: J’en faisais trop dans .Apply (), surtout autour de la tolérance à la date.
Réparer: J’ai réécrit ces pièces en utilisant des opérations vectorisées Pandas – des choses comme les gammes de diffusion et en utilisant .merge_asof () au lieu de vérifications par ligne.

# Here, Vectorized version of .apply(lambda .... ....)
.... .... ....
df3 = pd.merge(data, df3, left_index=True, right_index=True)
if df3.empty:
    continue

df3["delta_days"] = (pd.to_datetime(df3[DATE_X]) - pd.to_datetime(df3[DATE_Y])).dt.days
df3[CATEGORY] = np.where((df3["delta_days"] >= LOWER_BOUND) & (df3["delta_days"] <= UPPER_BOUND), category, "X")
.... .... ....
Result:
Boom - down to 2 hours. That’s a 7x improvement.
Vectorization is king in pandas, no kidding.

Cas 3: Chunking intelligent + équilibrage des ressources

Problème: Mes morceaux étaient inégaux. Certains processus avaient des charges de données massives; D’autres étaient inactifs.
Réparer: J’ai écrit un gestionnaire de morceaux dynamique qui s’est séparé Panoramique Groupes plus intelligemment basés sur la mémoire disponible et les noyaux CPU. Maintenant, plusieurs petits morceaux pourraient être gérés par processus sans mourir de faim ou de surcharge du système.

# Here, we finalize the chunk size and decide how many tasks will run in parallel, aiming for as balanced resource usage as possible.
.... .... ....
grouped_task = []
temp_df1 = None
temp_df2 = None
total_rows_df1 = sum(groups_df1[k].shape[0] for k in common_keys)
total_rows_df2 = sum(groups_df2[k].shape[0] for k in common_keys)
total_rows = max(total_rows_df1, total_rows_df2)
chunk_size = min(MIN_CHUCK_SIZE, total_rows // max_workers)

for key in common_keys:
    if temp_df1 is None:
        temp_df1 = groups_df1.pop(key)
        temp_df2 = groups_df2.pop(key)
    else:
        temp_df1 = pd.concat([temp_df1, groups_df1.pop(key)], ignore_index=True)
        temp_df2 = pd.concat([temp_df2, groups_df2.pop(key)], ignore_index=True)
    if temp_df1.shape[0]>=chunk_size or temp_df2.shape[0]>=chunk_size:
        grouped_task.append((shared_list, key, temp_df1, temp_df2, ..., ...., ....))
        temp_df1 = None
        temp_df2 = None
if temp_df1 is not None:
grouped_task.append((shared_list, key, temp_df1, temp_df2, ..., ..., ....))
temp_df1 = None
temp_df2 = None
.... .... ....
Result:
Execution time fell to just 6 minutes.
Yes, from 48 hours to 6 minutes. And no fancy tools -- just pandas, multiprocessing, and smart engineering.

Conclusion

  • Évitez l’utilisation de .Apply () à moins que vous ne le fassiez vraiment. C’est intuitif, mais lent pour le Big Data.
  • Chunchez vos données intelligemment – pensez en groupes, surtout si votre logique commerciale permet une séparation propre (comme Pan-Tan dans mon cas).
  • Ne pas parallélines – le multiprocessement est puissant, mais uniquement lorsque vous équilibrez la mémoire, les E / S et le CPU intelligemment.
  • Fusionner la logique connexe lorsque cela est possible pour éviter les charges de données redondantes
  • Le code vectorisé n’est pas plus rapide – il est plus propre et plus facile à entretenir.






Source link