In [ ]:
...
In [ ]:
# progressivis-snippet
import tempfile
from pathlib import Path
from progressivis.datasets.wget import wget_file
from progressivis import Sink, ArrowBatchLoader
import duckdb
import pyarrow.parquet as pq
td = tempfile.TemporaryDirectory()
taxis_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2015-05.parquet"
taxis_file = Path(td.name) / "yellow_tripdata_2015-05.parquet"
zones_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
zones_file = Path(td.name) / "taxi_zone_lookup.csv"
SQL = (
f"SELECT tx.tpep_pickup_datetime, tx.tpep_dropoff_datetime, tx.passenger_count, zn.Borough"
f" FROM read_parquet('{taxis_file}') tx, read_csv('{zones_file}') zn"
" WHERE tx.PULocationID=zn.LocationID"
)
try:
wget_file(url=taxis_url, filename=taxis_file)
wget_file(url=zones_url, filename=zones_file)
con = duckdb.connect(database=":memory:")
n_rows = pq.ParquetFile(taxis_file).metadata.num_rows
con.execute(SQL)
reader = con.fetch_record_batch(1000)
finally:
td.cleanup()
@register_snippet
def taxis_zones(input_module, input_slot, columns):
scheduler = input_module.scheduler
with scheduler:
data = ArrowBatchLoader(reader=reader, n_rows=n_rows, scheduler=scheduler)
sink = Sink(scheduler=scheduler)
sink.input.inp = data.output.result
return SnippetResult(output_module=data, output_slot="result")
root¶
In [1]:
...
Taxis Borough¶
In [2]:
Constructor.widget('Taxis Borough', 0)
Out[2]:
Group by¶
In [3]:
Constructor.widget('Group by', 0)
Out[3]:
Aggregate¶
In [4]:
Constructor.widget('Aggregate', 0)
Out[4]:
Any Vega¶
In [5]:
Constructor.widget('Any Vega', 0)
Out[5]: