Structured Datasets: Core Concepts and Transformations
Structured Datasets provide a robust and efficient framework for managing, manipulating, and analyzing tabular data. This system is designed to handle diverse data sources, enforce data integrity through schemas, and enable complex transformations with optimized performance.
Purpose
The primary purpose of Structured Datasets is to offer a unified, programmatic interface for working with structured data, abstracting away the complexities of underlying storage formats and execution engines. It empowers developers to define, transform, and analyze datasets with strong typing, predictable behavior, and scalable performance, facilitating data preparation for analytics, machine learning, and reporting.
Core Features
The Structured Datasets system offers several core features that streamline data processing workflows:
- Schema Enforcement and Inference: Datasets are inherently schema-aware. The system supports explicit schema definition, ensuring data types and column names conform to expectations. It can also infer schemas from various data sources, providing a starting point for validation and transformation.
- Immutable Transformations: All transformation operations on a
StructuredDatasetinstance produce a new dataset, preserving the original. This immutability simplifies debugging, enables reproducible workflows, and supports lazy evaluation strategies. - Optimized Execution Engine: An underlying execution engine optimizes transformation chains. For lazy evaluation models, it constructs an execution plan that can be optimized for performance, potentially pushing down operations to the data source or leveraging parallel processing.
- Rich Transformation API: A comprehensive set of operations allows for filtering, selecting, aggregating, joining, sorting, and mapping data. These operations are designed to be expressive and composable, enabling complex data pipelines.
- Extensible Data Sources and Sinks: The system provides connectors for reading data from and writing data to various formats and systems, including CSV, Parquet, JSON, and relational databases.
Key Concepts
Structured Dataset Definition
A StructuredDataset represents a collection of data organized into named columns, each with a defined data type. It is conceptually similar to a table in a relational database or a DataFrame in data analysis libraries.
Creating a dataset typically involves loading data from a source or constructing it programmatically:
from structured_datasets import DatasetBuilder, Schema, Field, DataType
# Define a schema
user_schema = Schema([
Field("user_id", DataType.INT),
Field("username", DataType.STRING),
Field("email", DataType.STRING),
Field("age", DataType.INT),
Field("is_active", DataType.BOOLEAN)
])
# Build a dataset from a list of dictionaries
data = [
{"user_id": 1, "username": "alice", "email": "alice@example.com", "age": 30, "is_active": True},
{"user_id": 2, "username": "bob", "email": "bob@example.com", "age": 24, "is_active": False},
{"user_id": 3, "username": "charlie", "email": "charlie@example.com", "age": 35, "is_active": True},
]
users_dataset = DatasetBuilder.from_records(data, schema=user_schema)
print(users_dataset.head(2))
Schema Management
The Schema defines the structure of a StructuredDataset, specifying column names and their corresponding DataType. This is crucial for data validation, type safety, and optimizing storage and processing.
# Accessing the schema of an existing dataset
current_schema = users_dataset.schema
print(current_schema)
# Adding a new field to a schema (for future dataset creation or validation)
extended_schema = current_schema.add_field(Field("last_login", DataType.TIMESTAMP))
print(extended_schema)
Data Representation
Internally, StructuredDataset instances often leverage columnar storage for efficiency, especially during analytical operations. This means data for a single column is stored contiguously, which improves cache locality and enables vectorized operations. While the user interacts with a logical table, the underlying implementation optimizes for performance based on this representation.
Transformation Operations
Transformations are operations that modify a StructuredDataset to produce a new one. These operations are declarative, meaning developers specify what to do, not how to do it. The execution engine then determines the most efficient way to perform the operation.
Common transformation categories include:
- Selection/Projection: Choosing specific columns or creating new ones.
- Filtering: Subsetting rows based on conditions.
- Aggregation: Summarizing data (e.g., sum, average, count).
- Joining: Combining datasets based on common keys.
- Sorting: Ordering rows by column values.
- Mapping: Applying a function to transform column values or create new columns.
Execution Models (Eager vs. Lazy)
The system supports both eager and lazy execution models, with lazy evaluation being the default for most complex operations to maximize performance.
- Eager Execution: Operations are executed immediately, and results are computed and materialized in memory. This is suitable for smaller datasets or when intermediate results are explicitly needed.
- Lazy Execution: Operations build an execution plan without immediately computing results. The actual computation is deferred until an action (like
collect(),to_list(), orwrite_parquet()) is called. This allows the optimizer to reorder, combine, and prune operations for maximum efficiency, especially with large datasets.
# Example of lazy evaluation
lazy_dataset = users_dataset.filter(lambda row: row["age"] > 25) \
.select("username", "age") \
.sort("age", ascending=False)
# No computation happens yet. The plan is built.
print("Execution plan built, no data processed yet.")
# Action: Trigger computation and collect results
active_users_over_25 = lazy_dataset.collect()
print("\nCollected results:")
for row in active_users_over_25:
print(row)
Common Transformations
Filtering and Projection
Filtering selects rows that satisfy a given condition, while projection selects specific columns or creates new ones.
# Filter for active users
active_users = users_dataset.filter(lambda row: row["is_active"] == True)
print("Active Users:")
print(active_users.head(5))
# Select only username and email, and add a new column 'domain'
projected_dataset = users_dataset.select(
"username",
"email",
users_dataset.column("email").map(lambda email: email.split('@')[1]).alias("domain")
)
print("\nProjected Dataset with Domain:")
print(projected_dataset.head(5))
Aggregation
Aggregation operations summarize data, often after grouping rows by one or more columns.
# Group by 'is_active' and count users, calculate average age
summary_dataset = users_dataset.groupBy("is_active").agg(
users_dataset.column("user_id").count().alias("user_count"),
users_dataset.column("age").avg().alias("average_age")
)
print("\nUser Summary by Activity:")
print(summary_dataset.collect())
Joining Datasets
Joining combines two datasets based on common columns, similar to SQL JOIN operations.
# Assume another dataset for orders
orders_schema = Schema([
Field("order_id", DataType.INT),
Field("user_id", DataType.INT),
Field("amount", DataType.FLOAT)
])
orders_data = [
{"order_id": 101, "user_id": 1, "amount": 50.0},
{"order_id": 102, "user_id": 3, "amount": 120.5},
{"order_id": 103, "user_id": 1, "amount": 75.0},
{"order_id": 104, "user_id": 2, "amount": 30.0},
]
orders_dataset = DatasetBuilder.from_records(orders_data, schema=orders_schema)
# Perform an inner join
users_with_orders = users_dataset.join(orders_dataset, on="user_id", how="inner")
print("\nUsers with Orders (Inner Join):")
print(users_with_orders.head(5))
Mapping and Custom Operations
The mapRows() and withColumn() methods allow for applying custom logic to transform data or create new columns.
# Using mapRows to transform each row
transformed_users = users_dataset.mapRows(
lambda row: {
"full_name": row["username"].upper(),
"age_group": "Adult" if row["age"] >= 18 else "Minor"
},
output_schema=Schema([
Field("full_name", DataType.STRING),
Field("age_group", DataType.STRING)
])
)
print("\nTransformed Users (mapRows):")
print(transformed_users.head(5))
# Using withColumn to add a new column based on existing ones
users_with_status = users_dataset.withColumn(
"status",
users_dataset.column("is_active").map(lambda active: "Active" if active else "Inactive")
)
print("\nUsers with Status (withColumn):")
print(users_with_status.head(5))
Windowing Functions
Windowing functions perform calculations across a set of rows related to the current row, often used for ranking, moving averages, or cumulative sums.
# Assume a dataset with sales data, partitioned by product and ordered by date
sales_schema = Schema([
Field("product_id", DataType.INT),
Field("sale_date", DataType.DATE),
Field("revenue", DataType.FLOAT)
])
sales_data = [
{"product_id": 1, "sale_date": "2023-01-01", "revenue": 100.0},
{"product_id": 1, "sale_date": "2023-01-02", "revenue": 150.0},
{"product_id": 2, "sale_date": "2023-01-01", "revenue": 200.0},
{"product_id": 1, "sale_date": "2023-01-03", "revenue": 120.0},
{"product_id": 2, "sale_date": "2023-01-02", "revenue": 220.0},
]
sales_dataset = DatasetBuilder.from_records(sales_data, schema=sales_schema)
# Calculate a running total of revenue per product
from structured_datasets import Window, PartitionBy, OrderBy, Sum
window_spec = Window.partitionBy(PartitionBy("product_id")).orderBy(OrderBy("sale_date"))
sales_with_running_total = sales_dataset.withColumn(
"running_revenue",
Sum("revenue").over(window_spec)
)
print("\nSales with Running Total Revenue:")
print(sales_with_running_total.sort("product_id", "sale_date").collect())
Integration and Data Flow
Data Ingestion
The system provides read_ methods for ingesting data from various sources. These methods often infer the schema or allow explicit schema definition.
# Reading from a CSV file
# Assuming 'data.csv' exists with user data
# users_from_csv = DatasetBuilder.read_csv("data.csv", schema=user_schema)
# Reading from a Parquet file
# users_from_parquet = DatasetBuilder.read_parquet("users.parquet")
Data Export
Transformed datasets can be exported to different formats or systems using write_ methods.
# Writing to a Parquet file
# users_dataset.write_parquet("processed_users.parquet", mode="overwrite")
# Writing to a CSV file
# users_dataset.write_csv("processed_users.csv", header=True)
Performance Considerations
- Lazy Evaluation: Leverage lazy evaluation by chaining transformations. The optimizer can then reorder operations, push down filters, and combine projections to minimize data movement and computation.
- Columnar Operations: Prefer built-in columnar operations (e.g.,
filter(),select(),groupBy()) over row-wisemapRows()when possible, as they are often highly optimized. - Schema Definition: Providing an explicit schema during ingestion or transformation helps the system optimize memory allocation and type checking.
- Data Locality: When integrating with distributed systems, consider data partitioning and co-location for joins and aggregations to minimize network transfer.
- Memory Management: For very large datasets, be mindful of actions like
collect()which materialize the entire dataset in memory. Use iterative processing or write directly to external storage when possible.
Limitations and Best Practices
- Schema Evolution: While schemas are robust, significant schema changes mid-pipeline can require careful handling. Plan for schema evolution by using flexible data types or versioning schemas.
- Custom Logic Performance: Custom Python functions used in
mapRows()ormap()on columns can be less performant than built-in operations, especially for large datasets, as they might not benefit from the underlying engine's optimizations. Consider expressing logic using the declarative API where possible. - Error Handling: Implement robust error handling, especially during data ingestion and schema validation, to catch malformed data early.
- Resource Management: When operating on large datasets, ensure sufficient memory and CPU resources are available, particularly for actions that trigger computation.
- Testing: Thoroughly test transformation pipelines with representative data to ensure correctness and performance.
Common Use Cases
- ETL (Extract, Transform, Load) Pipelines: Ingesting raw data from various sources, cleaning and transforming it, and loading it into data warehouses or data lakes for analytics.
- Feature Engineering for Machine Learning: Preparing datasets by creating new features, handling missing values, scaling, and encoding categorical variables before training machine learning models.
- Data Aggregation and Reporting: Summarizing transactional data, calculating key performance indicators (KPIs), and generating reports for business intelligence.
- Data Validation and Quality Checks: Applying schema validation and custom rules to ensure data integrity and identify anomalies.
- Data Integration: Combining data from disparate systems (e.g., CRM, ERP, web analytics) into a unified view for comprehensive analysis.