Technology & AI

A Coding Guide to Building a Scalable End-to-End Machine Learning Data Pipeline Using Daft for High-Performance Structured and Image Data Processing

In this tutorial, we explore how we use it Daft as a high-performance, Python-native data engine to build an end-to-end analysis pipeline. We start by loading a real-world MNIST dataset, then gradually modify it using UDFs, feature engineering, clustering, joins, and lazy processing. Also, we show how to seamlessly integrate structured data processing, computing, and machine learning. Finally, we don’t just manipulate data, we build a complete model-ready pipeline powered by Daft’s distribution engine.

!pip -q install daft pyarrow pandas numpy scikit-learn


import os
os.environ["DO_NOT_TRACK"] = "true"


import numpy as np
import pandas as pd
import daft
from daft import col


print("Daft version:", getattr(daft, "__version__", "unknown"))


URL = "


df = daft.read_json(URL)
print("nSchema (sampled):")
print(df.schema())


print("nPeek:")
df.show(5)

We include Daft and its supporting libraries directly in Google Colab to ensure a clean, reproducible environment. We configure optional settings and verify the installed version to make sure everything is working properly. By doing this, we establish a stable foundation to build our end-to-end data pipeline.

def to_28x28(pixels):
   arr = np.array(pixels, dtype=np.float32)
   if arr.size != 784:
       return None
   return arr.reshape(28, 28)


df2 = (
   df
   .with_column(
       "img_28x28",
       col("image").apply(to_28x28, return_dtype=daft.DataType.python())
   )
   .with_column(
       "pixel_mean",
       col("img_28x28").apply(lambda x: float(np.mean(x)) if x is not None else None,
                              return_dtype=daft.DataType.float32())
   )
   .with_column(
       "pixel_std",
       col("img_28x28").apply(lambda x: float(np.std(x)) if x is not None else None,
                              return_dtype=daft.DataType.float32())
   )
)


print("nAfter reshaping + simple features:")
df2.select("label", "pixel_mean", "pixel_std").show(5)

We load a real-world MNIST JSON dataset directly from a remote URL using Daft’s native reader. We examine the schema and preview the data to understand its structure and column types. It allows us to validate the dataset before applying changes and engineering the feature.

@daft.udf(return_dtype=daft.DataType.list(daft.DataType.float32()), batch_size=512)
def featurize(images_28x28):
   out = []
   for img in images_28x28.to_pylist():
       if img is None:
           out.append(None)
           continue
       img = np.asarray(img, dtype=np.float32)
       row_sums = img.sum(axis=1) / 255.0
       col_sums = img.sum(axis=0) / 255.0
       total = img.sum() + 1e-6
       ys, xs = np.indices(img.shape)
       cy = float((ys * img).sum() / total) / 28.0
       cx = float((xs * img).sum() / total) / 28.0
       vec = np.concatenate([row_sums, col_sums, np.array([cy, cx, img.mean()/255.0, img.std()/255.0], dtype=np.float32)])
       out.append(vec.astype(np.float32).tolist())
   return out


df3 = df2.with_column("features", featurize(col("img_28x28")))


print("nFeature column created (list[float]):")
df3.select("label", "features").show(2)

We reconstruct the same raw pixels into 28×28 structured images using a smart UDF. We calculate statistical properties, such as mean and standard deviation, to enrich the dataset. Using these transformations, we transform raw image data into structured and model-friendly representations.

label_stats = (
   df3.groupby("label")
      .agg(
          col("label").count().alias("n"),
          col("pixel_mean").mean().alias("mean_pixel_mean"),
          col("pixel_std").mean().alias("mean_pixel_std"),
      )
      .sort("label")
)


print("nLabel distribution + summary stats:")
label_stats.show(10)


df4 = df3.join(label_stats, on="label", how="left")


print("nJoined label stats back onto each row:")
df4.select("label", "n", "mean_pixel_mean", "mean_pixel_std").show(5)

We use a batch UDF to extract rich feature vectors from reconstructed images. We perform group measurements and join summary statistics back to the dataset to develop context. This shows how we integrate advanced analytics and analytics within Daft.

small = df4.select("label", "features").collect().to_pandas()


small = small.dropna(subset=["label", "features"]).reset_index(drop=True)


X = np.vstack(small["features"].apply(np.array).values).astype(np.float32)
y = small["label"].astype(int).values


from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report


clf = LogisticRegression(max_iter=1000, n_jobs=None)
clf.fit(X_train, y_train)


pred = clf.predict(X_test)
acc = accuracy_score(y_test, pred)


print("nBaseline accuracy (feature-engineered LogisticRegression):", round(acc, 4))
print("nClassification report:")
print(classification_report(y_test, pred, digits=4))


out_df = df4.select("label", "features", "pixel_mean", "pixel_std", "n")
out_path = "/content/daft_mnist_features.parquet"
out_df.write_parquet(out_path)


print("nWrote parquet to:", out_path)


df_back = daft.read_parquet(out_path)
print("nRead-back check:")
df_back.show(3)

We pandas the selected columns and train a basic Logistic Regression model. We test performance to ensure the usefulness of our engineered features. Also, we persist the processed data set into Parquet format, completing our end-to-end pipeline from raw data entry to production-ready storage.

In this tutorial, we built a production-style data workflow using Daft, from raw JSON import to feature engineering, integration, model training, and Parquet persistence. We’ve shown how to integrate advanced UDF logic, properly group and join functions, and generate downstream machine learning results, all within a clean, scalable framework. Through this process, we saw how Daft enables us to handle complex changes while remaining Pythonic and efficient. We ended up with a reusable, end-to-end pipeline that shows how we can combine modern data engineering and machine learning workflows in a unified environment.


Check out Full Codes here. Also, feel free to follow us Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.


Michal Sutter is a data science expert with a Master of Science in Data Science from the University of Padova. With a strong foundation in statistical analysis, machine learning, and data engineering, Michal excels at turning complex data sets into actionable insights.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button