The Future of Data is a rude awakening for the Modern Data Stack
What "good" looks like in 2 years
Introduction
While Snowflake and Databricks dominate the headlines, the quieter older brother was front page earlier this week.
BigQuery finally released some numbers that indicate usage. Apparently, they have 5 times as many customers as their next biggest competitor. Snowflake reportedly have over 10k customers, which puts user numbers for the GCP Warehouse at about 55–60k. Impressive.
Something GCP did was do two things
Focus offerings around their main product; BigQuery
Simplicity: they focussed on making things radically simple to use, but also powerful
Take Dataform as an example. You can easily create SQL-based pipelines in GCP without having to learn a new framework like dbt or faff around with local development. It’s basically free, and it’s just there in the UI, which is pretty cool.
Interestingly, you never really see storage in BigQuery. You’re just sort of interacting with it with different applications in the GCP ecosystem. Now with their support for iceberg, the experience is basically the same.
This is in stark contrast to the Modern Data Stack world, where practitioners rely on data warehouses like Snowflake or Databricks but then have a host of other applications to play around with too.
Indeed, over 70% of practitioners think things are too complex. And these are folks on GCP too — while GCP have a great model and huge distribution advantages, fundamentally a lot of what they offer is just not as good as other alternatives — people think the juice is worth the squeeze, and don’t go “full GCP”.
Iceberg has the potential to make things radically simpler for organisations. But it leaves a massive glaring hole around user experience. Warehouses offer more than just a commoditisable source of compute, such as a way to query data, security and governance/RBAC, Quality, Workflows, etc. Where should this live in a post-MDS age?
In this post, we’ll show how you can build a lakehouse with Orchestra and Bauplan. Bauplan is used as the compute engine for managing tables on iceberg. Orchestra is used for everything else — it is the single pane of glass for building, running and deploying Data and AI workflows. It leaves a couple of holes, which we’ll get to at the end.
Let’s dive in.
End-to-end Lakehouse pattern with Bauplan and Orchestra
If you have data in iceberg, then all you need to do is have a way to manipulate it.
This is non trivial because for iceberg to work, you can’t just write the data; you also need to write the metadata to a catalog which is non trival. For a comparison of what data catalogs are, see this excellent article from Kyle Weller.
This pattern deploys a “simple data stack”. Orchestra is used for landing data and co-ordinating jobs in Bauplan. S3 is used as the storage layer. Bauplan provides the catalog and compute for transforming data on S3.
Landing Data
To land data, we’ll just do something very basic and land data in S3 from an SFTP server. You can see the code below.
import os
from unittest import mock
import paramiko
import requests
import json
import boto3
import datetime
import time
def upload_file_to_s3(file_location,bucket_name,key):
s3 = boto3.client('s3')
output = s3.upload_file(file_location,bucket_name,key)
print(output)
def list_s3_objects(bucket_name):
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket_name)
for my_bucket_object in my_bucket.objects.all():
print(my_bucket_object)
def set_orchestra_output(output_name: str, output_value:str):
"""
Use this function to set an output on the Task Run that can be used by downstream tasks
"""
data = {
"event_type": "SET_OUTPUT",
"task_run_id": os.getenv('ORCHESTRA_TASK_RUN_ID'), # Set Automatically
"output_name": output_name,
"output_value": output_value
}
url = "https://webhook.getorchestra.io"
api_key = os.getenv('ORCHESTRA_API_KEY') ## Set in Orchestra Python Connection
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
response = requests.post(url, headers=headers, data=json.dumps(data))
# Print the response
print(response.status_code)
print(response.text)
if response.status_code != 200:
raise Exception("")
def connect_sftp(host, port, username, password):
"""Establish and return a mocked SFTP connection"""
transport = paramiko.Transport((host, port))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
return sftp, transport
def list_files(sftp, remote_dir):
"""List all files in the given remote directory"""
try:
files = sftp.listdir(remote_dir)
print(f"Files in '{remote_dir}':")
for f in files:
print(f" - {f}")
return files
except FileNotFoundError:
print(f"Directory not found: {remote_dir}")
return []
def download_file(sftp, remote_path, local_path):
"""Download a file from the remote path to the local path"""
try:
with sftp.open(remote_path, 'r') as remote_file:
content = remote_file.read()
with open(local_path, 'wb') as f:
f.write(content)
print(f"Downloaded: {remote_path} → {local_path}")
except FileNotFoundError:
print(f"File not found: {remote_path}")
def mock_sftp():
# ==== CONFIGURATION ====
host = "mock_host"
port = 22
username = "mock_user"
password = "mock_pass"
remote_directory = "/mock/remote"
file_to_download = os.getenv('FILE_PATH')
local_download_path = os.path.join(os.getcwd(), file_to_download)
# ==== MOCKING START ====
with mock.patch('paramiko.Transport') as MockTransport, \
mock.patch('paramiko.SFTPClient.from_transport') as MockSFTPClient:
mock_transport_instance = mock.MagicMock()
MockTransport.return_value = mock_transport_instance
mock_sftp_instance = mock.MagicMock()
MockSFTPClient.return_value = mock_sftp_instance
# Mock listdir return
mock_sftp_instance.listdir.return_value = ["shipments.json", "another_file.csv"]
# Mock file reading — JSON with sample shipment data
mock_shipments_data = {
"shipments": [
{"id": "SHP001", "origin": "New York", "destination": "Chicago", "status": "In Transit"},
{"id": "SHP002", "origin": "Los Angeles", "destination": "San Francisco", "status": "Delivered"},
{"id": "SHP003", "origin": "Miami", "destination": "Houston", "status": "Pending"}
]
}
json_bytes = json.dumps(mock_shipments_data, indent=2).encode('utf-8')
mock_file = mock.MagicMock()
mock_file.read.return_value = json_bytes
mock_sftp_instance.open.return_value.__enter__.return_value = mock_file
try:
# Connect (mocked)
sftp, transport = connect_sftp(host, port, username, password)
# 1. List files (mocked)
list_files(sftp, remote_directory)
# 2. Download file (mocked read and local write)
full_remote_path = f"{remote_directory}/{file_to_download}"
print("Fetching file from: " + str(full_remote_path))
download_file(sftp, full_remote_path, local_download_path)
print("Downloading File Complete")
date__ = str(datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d'))
filename = "sftp/"+date__+"/"+date__+os.getenv('FILE_PATH')
upload_file_to_s3(os.getenv('FILE_PATH'), "orchestra-databricks", filename)
print("File uploaded to S3")
# Clean up (mocked)
sftp.close()
transport.close()
# Output S3-style mock path
set_orchestra_output("s3_path", filename)
except Exception as e:
set_orchestra_output("error", str(e))
print(f"Error: {e}")
if __name__ == "__main__":
mock_sftp()
Landing Data using Bauplan and Iceberg
The next step will be to turn this data which is raw JSON in S3 to an Iceberg Table. You cannot currently do this “in flight”, which might be preferable. You need to land it in S3 first, which means this layer is basically a “RAW” layer, so we can now do some quality checks before we write to Iceberg.
You can see the code we use with Bauplan and Orchestra below
"""
This is a simple stand-alone script that runs a WAP ingestion pipeline: the
script is meant to be picked up by an Orchestra flow, and it uses Bauplan
as a programmable lakehouse to perform the ingestion and quality checks.
in pure Python.
Note how much lighter the integration is compared to other datalake tools ;-)
"""
import bauplan
import os
def source_to_iceberg_table(
bauplan_client: bauplan.Client,
table_name: str,
namespace: str,
source_s3_pattern: str,
bauplan_ingestion_branch: str
):
"""
Wrap the table creation and upload process in Bauplan.
"""
# if the branch already exists, we delete it and create a new one
if bauplan_client.has_branch(bauplan_ingestion_branch):
bauplan_client.delete_branch(bauplan_ingestion_branch)
# create the branch from main
bauplan_client.create_branch(bauplan_ingestion_branch, from_ref='main')
# create namespace if it doesn't exist
if not bauplan_client.has_namespace(namespace, ref=bauplan_ingestion_branch):
bauplan_client.create_namespace(namespace, branch=bauplan_ingestion_branch)
# now we create the table in the branch
bauplan_client.create_table(
table=table_name,
search_uri=source_s3_pattern,
namespace=namespace,
branch=bauplan_ingestion_branch,
# just in case the test table is already there for other reasons
replace=True
)
# now we import the data
is_imported = bauplan_client.import_data(
table=table_name,
search_uri=source_s3_pattern,
namespace=namespace,
branch=bauplan_ingestion_branch
)
return is_imported
def run_quality_checks(
bauplan_client: bauplan.Client,
bauplan_ingestion_branch: str,
namespace: str,
table_name: str
) -> bool:
"""
We check the data quality by running the checks in-process: we use
Bauplan SDK to query the data as an Arrow table, and check if the
target column is not null through vectorized PyArrow operations.
"""
# we retrieve the data and check if the table is column has any nulls
# make sure the column you're checking is in the table, so change this appropriately
# if you're using a different dataset
column_to_check = 'passenger_count'
# NOTE: you can interact with the lakehouse in pure Python (no SQL)
# and still back an Arrow table (in this one column) through a performant scan.
wap_table = bauplan_client.scan(
table=table_name,
ref=bauplan_ingestion_branch,
namespace=namespace,
columns=[column_to_check]
)
# we return a boolean, True if the quality check passed, False otherwise
return wap_table[column_to_check].null_count > 0
def wap_with_bauplan():
"""
Run the WAP ingestion pipeline using Bauplan in Orchestra.
"""
# get some vars from orchestra environment
s3_path = 's3://alpha-hello-bauplan/green-taxi/*.parquet'
bauplan_api_key = os.getenv("BAUPLAN_API_KEY")
# start the Bauplan client
bauplan_client = bauplan.Client(api_key=bauplan_api_key)
username = bauplan_client.info().user.username
# change the vars here if you wish to use a different table / namespace
ingestion_branch = f'{username}.wap_with_orchestra'
table_name = 'trip_wap'
namespace = 'orchestra'
### THIS IS THE WRITE
# first, ingest data from the s3 source into a table the Bauplan branch
source_to_iceberg_table(
bauplan_client,
table_name,
namespace,
s3_path,
ingestion_branch
)
### THIS IS THE AUDIT
# we query the table in the branch and check we have no nulls
is_check_passed = run_quality_checks(
bauplan_client,
ingestion_branch,
namespace=namespace,
table_name=table_name
)
assert is_check_passed, 'Quality checks failed'
# THIS IS THE PUBLISH
# finally, we merge the branch into the main branch if the quality checks passed
bauplan_client.merge_branch(
source_ref=ingestion_branch,
into_branch='main'
)
bauplan_client.delete_branch(ingestion_branch)
return
if __name__ == "__main__":
wap_with_bauplan()
Bauplan Code summary;
The script sends a request to Bauplan which is a cloud-hosted service linked to runners in your/my infrastructure. The first function
source_to_iceberg_table
copies data from the S3 path where we landed the data from SFTP and creates an Iceberg Table — Bauplan is basically providing the catalog here and writing to your S3This is done in a temporary area which Bauplan abstracts into the idea of a branch, so you’re not actually updating “production” yet. You’re still writing data in the same AWS Account and even the same “table”, but in a different version — similar to what happens to your repo when you move branches in Git! Later, we will merge this version to make it “production”, and delete the temporary branch. If you have massive iceberg tables you shouldn’t be sat there waiting ages for feedback
We then run a quality check. Crucially, you are not actually crunching any data on your computer. A quality check could be massive — what if you wanted to scan a full table and check rows were unique? Running this code will send the request to Bauplan, which then will crunch the data in the infrastructure, and return the results back to you (technically a server-side push and then evaluate the test results in the Bauplan infra)
If the quality checks pass, we
merge
the branch which is basically just deleting the old branch /data and copying the data to the production “branch” but it’s really a namespace. As before data does not get copied so it is much fast
Now we’ve moved our data to the raw layer, quality checked it, and moved it to iceberg. The next step is to join multiple models together and start having some fun.
Transforming Iceberg Data using Bauplan and Orchestra
This is where Orchestra comes in really handy. Running python workflows and all is fine when they’re linear, but what about when you need to run a complex web of dependencies?
The answer here is to use an Orchestrator. By defining your logic in Orchestra you can trigger the different workflows when you need them easily.
Let’s see how that looks.
Declaratively modelling data using Bauplan
The code for declaratively modeling data in S3 using Bauplan is below
"""
This script collects Bauplan models, i.e. transformations that are run in Python mapping an
input table (data=bauplan.Model), to another table (a dataframe-like object we return).
Note that collecting models in a single file called models.py is not required, but we find it useful
to keep the pipeline code together.
"""
# import bauplan to get the decorators available
import bauplan
# this decorator tells Bauplan that this function has the model semantics - input: table, output: table.
# the input is always an Arrow table, output can be an Arrow table, a pandas dataframe or a list of dictionaries.
# the materialize flag is used to tell the system whether to persist the output of a function as an Iceberg table in the data catalog
@bauplan.model()
# this decorator allow you to specify the Python version and any pip packages you need for this function
# remember that the environment for each function is entirely separated
# e.g. different functions can run with different packages, different versions of the same packages
# and/or even different versions of the python interpreter
@bauplan.python('3.11')
def trips_and_zones_2(
trips=bauplan.Model(
'trip_wap',
# this function performs an S3 scan directly in Python, so we can specify the columns and the filter pushdown
# by pushing the filters down to S3 we make the system considerably more performant
columns=[
'pickup_datetime',
'dropoff_datetime',
'PULocationID',
'DOLocationID',
'trip_miles',
'trip_time',
'base_passenger_fare',
'tolls',
'sales_tax',
'tips',
],
filter="pickup_datetime >= '2022-12-15T00:00:00-05:00' AND pickup_datetime < '2023-01-01T00:00:00-05:00'"
),
zones=bauplan.Model(
'taxi_zones',
),
):
# the following code is PyArrow https://arrow.apache.org/docs/python/index.html
# because Bauplan speaks Arrow natively you don't need to import PyArrow explicitly
# join 'trips' with 'zones' on 'PULocationID'
pickup_location_table = trips.join(zones, 'PULocationID', 'LocationID').combine_chunks()
return pickup_location_table
@bauplan.model()
# this time notice that we specify one dependency, namely Pandas 1.5.3.
@bauplan.python('3.11', pip={'pandas': '1.5.3', 'numpy': '1.23.2'})
def normalized_taxi_trips(
data=bauplan.Model(
# this function takes the previous one 'trips_and_zones' as an input
# functions are chained together to form a DAG by naming convention
'trips_and_zones_2',
)
):
import pandas as pd
import numpy as np
import math
# print some debug info - you will see every print statement directly in your terminal
size_in_gb = round(data.nbytes / math.pow(1024, 3), 3)
print(f"\nThis table is {size_in_gb} GB and has {data.num_rows} rows\n")
# convert data from Arrow to Pandas
df = data.to_pandas()
# create time filter on datetime UTC
time_filter = pd.to_datetime('2022-01-01')
time_filter_utc = time_filter.tz_localize('UTC')
# filter df by timestamp
df = df[df['pickup_datetime'] >= time_filter_utc]
# exclude rows with trip_miles = 0
df = df[df['trip_miles'] > 0.0]
# exclude rows with trip_miles > 200
df = df[df['trip_miles'] < 200.0]
# create a new columns with log-transformed trip_miles to better model skewed distribution
df['log_trip_miles'] = np.log10(df['trip_miles'])
# return a Pandas dataframe
return df
Basically every function is a table name. The functions get decorated. You input different tables as parameters into the function.
If you do bauplan run
then it will, based on the relevant configs, write these dataframes as tables to S3 in iceberg and back the metadata in Bauplan’s catalog.
I did not experiment with folder structure.
End-to-end workflow
This end-to-end workflow is super simple in Orchestra
version: v1
name: bauplans
pipeline:
9e4c41ab-ca67-4b74-ad96-ad8e7aba106b:
tasks:
9c171462-e255-45b5-b166-76e16bcb4311:
integration: PYTHON
integration_job: PYTHON_EXECUTE_SCRIPT
parameters:
command: python -m scan
package_manager: PIP
python_version: '3.12'
build_command: pip install -r requirements.txt
project_dir: bauplan
shallow_clone_dirs: bauplan
set_outputs: false
depends_on: []
condition: null
name: run bauplan
tags: []
connection: python__bauplan__prod__89530
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on:
- bc4e43ee-b669-4c94-b9de-50c3d66c9674
condition: null
name: ''
bc4e43ee-b669-4c94-b9de-50c3d66c9674:
tasks:
767cf854-85a1-40bf-ac40-04b2d4aa3519:
integration: PYTHON
integration_job: PYTHON_EXECUTE_SCRIPT
parameters:
command: python -m sftp
package_manager: PIP
python_version: '3.12'
build_command: pip install -r requirements.txt
environment_variables: '{
"FILE_PATH":"orders.json"
}'
project_dir: python/integration_a
shallow_clone_dirs: python
set_outputs: false
depends_on: []
condition: null
name: Import File to S3
tags: []
connection: python__production__blueprints__19239
operation_metadata: null
treat_failure_as_warning: null
configuration: null
depends_on: []
condition: null
name: ''
schedule: []
sensors: {}
trigger_events: []
webhook:
enabled: false
operation_metadata: null
run_inputs: null
configuration: null
inputs: null

You can see the output of the logs. At the moment running bauplan run
in the CLI is not quite equivalent to
import bauplan
print("Starting WAP at {}!".format(datetime.now()))
bauplan_client = bauplan.Client(api_key=os.environ['BAUPLAN_API_KEY'])
bauplan_client.run(ref="orchestra.dev_hugo", namespace="bauplan")
because you don’t get the nice log messages coming out automatically. But the ones you print you can get and I’m sure more support for this will come soon. I actually wrapped the above with a call to fetch the log info so you can see how this displays in Orchestra below

Why this is radically simpler than the MDS today
The reason Snowflake and Cloud Data warehouses took off was because of the structural separation of storage and compute.
It meant you could have more flexible pricing instead of paying Oracle for bulky hard-to-scale on-premises instances.
However, it created complexity. How to get the data there? How to transform the data? How to work with AWS? How to run Data Quality Tests? Do I need lineage? and so on and so forth.
With Orchestra you basically don’t need anything apart from a data warehouse for running an end-to-end pipeline. You can move data, transform it, catalog, test quality, build workflows across different teams, integrate to other platforms and so on.
That said, and for complete transparency, I have no interest in building and maintaining tools to move and ingest data. For now I think hosted options are a great choice for reasons I have gone into here.
Every data Ingestion Tool ever Ranked
Why Ingestion Tools will never go awaymedium.com
With Bauplan you don’t even need Snowflake’s compute. Oh and you get to do things on iceberg (sorry we haven’t even mentioned that). This structure of Workflow Platform + Scalable Compute + Iceberg and Object storage is now a legitimate reality.
In a world where data teams are obsessed with tooling and do not even have the bandwidth to consider iceberg or AI, I’ve just built an e2e stack with two pieces of SAAS on iceberg that also gives me complete flexibility to move that stack in whatever direction I want. Pretty cool.
Check out the Orchestra Docs here for some nice Orchestration patterns 💡
Conclusion
In a world where it is getting ever easier to build pipelines, managing data stacks still takes a huge effort, lots of hiring and months.
Despite this, we’re technologically in a place where a couple of pieces of SAAS and minimal engineering can give us a lakehouse on open storage which is apparently extremely cutting-edge and only available to the smartest engineers in the world at places like Apple.
What is evident to me is that the bottlenecks are shifting rapidly. What would Data Teams need to do to empower people to use the stack above easily?
The answer, of course, lies in a declarative framework.
Engineers can build reusable components. These are low-level specialised scripts that excel at doing things in a parameterisable way. For example, you might define:
def ingest_accounts()
pass
def ingest_opportunities()
pass
def ingest_something_dynamic(object_name:str)
pass
def ingest_salesforce_data(object:str):
if object == 'accounts':
return ingest_accounts()
elif object =='opportunities':
return ingest_opportunities()
else:
return ingest_something_dynamic(object)
def ingest(integration, object):
if integration== 'salesforce':
return ingest_salesforce_data()
ingest(os.getenv['integration'], os.getenv['object'])
Here there is only going to be a single python script capable of ingesting any data. To Orchestrate this, a user would simply need a prompt like
ingest the new account_orders object from salesforce
That gets supplemented with
. An ingestion script can be located at <directory> under file name <file> and needs these environment variables <[list]>
Which yields a pipeline like we built above. Of course, if they want to use one of Orchestra’s pre-built blocks that can trigger a Fivetran sync, AWS lambda Function, Open AI API Call etc. they can.
The Data World has gone completely bonkers. There has been an explosion of tools for ever more technical and niche personas that have a tendency to massively overengineer for analytical and operational use-cases. It means Data Teams are now a massive bottleneck for AI adoption and automation.
Data Death Cycle: The Silo Trap
How to avoid common self-service pitfallsblog.det.life
The sharpest data teams are recognising they can deliver faster, spend less money, and have leaner teams with simple data stacks like this. Instead of “coming for the spark, and staying for the ecosystem” you can get the ecosystem in one place and get your spark/arrow/duckdb/datafusion/snowflake compute somewhere else.
There is a Swahili proverb I saw on Instagram recently.
“Mtaka yote hukosa yote.”
“One who wants all usually loses all.”
There is an option between buying every tool under the sun and going “all in” on a single vendor— if history has taught us anything, it’s that tools where folks went all-in like Talend, Cloudera, and Informatica all eventually see their end one way or another.
If this isn’t a rude awakening for the Modern Data Stack I don’t know what is.
Learn more about Bauplan
If you’d like to run this Bauplan example on your own, get free sandbox in this highly exclusive beta here.
Learn more about Orchestra
Orchestra is the only platform that is designed to help companies scale from centralised through to decentralised models of governance. If you’d like to learn more, check-out the links below.
📚 Docs
📊 Demo