Module Library#
I/O Modules#
Basic Printing#
- class progressivis.io.print.Every#
This module runs a function at each run_step with the content of its input slot, which can be of any type. By default, it prints the length of its input, which could be useful for PTable, PDict, and PIntSet.
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
df (
Any) β π Any input slot to watch
- __init__(proc=<function _print_len>, constant_time=True, **kwds)#
- Parameters:
proc (
Callable[[Any],None]) β callable with one argument, which will be the input slot data.constant_time (
bool) β True by default, set it to False if the printing function takes a time roughly proportional to the data sizekwds (
Any) β extra keyword args to be passed to theModulesuperclass
- class progressivis.io.print.Print#
This module prints the contents of its input slot.
- class progressivis.io.print.Tick#
This module prints a tick string when data arrives in the input slot, by default, a dot. It is useful to monitor the liveliness of a pipeline; different characters or strings can differentiate different parts of a pipeline.
Simple CSV Loader#
- class progressivis.io.simple_csv_loader.SimpleCSVLoader#
This module reads comma-separated values (csv) files progressively into a
PTable. Optionally, it provides information about missing values and anomalies via anomalies and missing output slots. Internally it usespandas.read_csv.- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
filenames (
PTable) β required: False π Files to read. The underlyingPTablemust have afilenamecolumn containing the file URIs. β οΈfilenamesslot andfilepath_or_bufferinit parameter cannot both be defined!- Output slots:
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, as_array=None, throttle=False, imputer=None, **kwds)#
- Parameters:
filepath_or_buffer (
Any|None) β str, path object or file-like object accepted bypandas.read_csvfilter_ (
Optional[Callable[[DataFrame],DataFrame]]) βfiltering function to be applied on input data at loading time
Example
>>> def filter_(df): ... lon = df['dropoff_longitude'] ... lat = df['dropoff_latitude'] ... return df[(lon>-74.10)&(lon<-73.7)&(lat>40.60)&(lat<41)]
force_valid_ids (
bool) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
dict[str,Any] |None) β the default values of the columns specified as a dictionary (seePTable)throttle (
bool|int|float) β limit the number of rows to be loaded in a stepimputer (
SimpleImputer|None) β aSimpleImputerprovides basic strategies for imputing missing valueskwds (
Any) β extra keyword args to be passed topandas.read_csvandModulesuperclass
Reliable CSV Loader#
- class progressivis.io.csv_loader.CSVLoader#
This module reads comma-separated values (csv) files progressively into a
PTable. It is able to resume the reading after a network failure or a crash. β οΈ this module do not wait forfilenames!- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
filenames (
PTable) β required: False π Files to read. The underlyingPTablemust have afilenamecolumn containing the file URIs. β οΈfilenamesslot andfilepath_or_bufferinit parameter cannot both be defined!- Output slots:
result (
PTable) β π Provides read data into aPTableobject
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, as_array=None, timeout=None, save_context=True, recovery=False, recovery_tag='', recovery_table_size=3, save_step_size=100000, **kwds)#
- Parameters:
filepath_or_buffer (
Optional[Any]) βstr, path object or file-like object accepted by
pandas.read_csvNB:
filenamesslot and filepath_or_buffer init parameter cannot both be definedfilter_ (
Optional[Callable[[DataFrame],DataFrame]]) βfiltering function to be applied on input data at loading time
Example
>>> def filter_(df): ... lon = df['dropoff_longitude'] ... lat = df['dropoff_latitude'] ... return df[(lon>-74.10)&(lon<-73.7)&(lat>40.60)&(lat<41)]
force_valid_ids (
bool) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
Optional[Dict[str,Any]]) β the default values of the columns specified as a dictionary (seePTable)as_array (
Union[str,Dict[str,List[str]],Callable[[List[str]],Dict[str,List[str]]],None]) βallows lists of n input columns to be grouped into output n-D columns (grouped column types must be identical):
when as_array is a
Dict[str, List[str]]every entry represents a grouping rule- Example:
>>> CSVLoader( ... get_dataset("bigfile"), ... as_array={ ... "first_group": ["_1", "_2", "_3"], ... "second_group": ["_11", "_12", "_13"], ... }, ... header=None, ... scheduler=s, ... )
when as_array is a
strit gives the name of an unique n-D output column which groups all input columnswhen as_array is a
Callableit must be able to process the list of all input columns and return aDict[str, List[str]]- Example:
>>> CSVLoader( ... get_dataset("mnist_784"), ... as_array=lambda cols: {"array": [ ... c for c in cols if c != "class"]}, ... scheduler=s, ... )
timeout (
Optional[float]) β stop waiting for a response and raise an exception after a given number of secondssave_context (
bool) β- saving information allowing recovery after a failure if
Trueand iffilepath_or_buffer designates a recoverable support (i.e. a filepath)
NB: currently backup only works when data input is provided via
filepath_or_buffer- saving information allowing recovery after a failure if
recovery (
bool) β whenFalse(default) the data entry is read from the beginning otherwise the previous reading is resumed using the save informationrecovery_tag (
Union[str,int]) β customize save tags (useful mainly for debugging)recovery_table_size (
int) β defines the size of the recovery table (useful mainly for debugging),save_step_size (
int) β defines de number of rows to read between two context snapshots,kwds (
Any) β extra keyword args to be passed topandas.read_csvandModulesuperclass
CSV Loader via PyArrow#
- class progressivis.io.pa_csv_loader.PACSVLoader#
- This module reads comma-separated values (csv) files progressively into a
PTable using the
PyArrowbackend.
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
filenames (
PTable) β required: False π Files to read. The underlyingPTablemust have afilenamecolumn containing the file URIs. β οΈfilenamesslot andfilepath_or_bufferinit parameter cannot both be defined!- Output slots:
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, read_options=None, parse_options=None, convert_options=None, drop_na=True, **kwds)#
- Parameters:
filepath_or_buffer (
Optional[Any]) β string, path object or file-like object accepted bypyarrow.csv.open_csvfilter_ (
Optional[Callable[[RecordBatch],RecordBatch]]) β filtering function to be applied on input data at loading timeforce_valid_ids (
bool) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
Optional[Dict[str,Any]]) β the default values of the columns specified as a dictionary (seePTable)throttle (
Union[bool,int,float]) β limit the number of rows to be loaded in a stepread_options (
Optional[ReadOptions]) β Options for the CSV reader (seepyarrow.csv.ReadOptions)parse_options (
Optional[ParseOptions]) β Options for the CSV parser (seepyarrow.csv.ParseOptions)convert_options (
Optional[ConvertOptions]) β Options for converting CSV data (seepyarrow.csv.ConvertOptions)drop_na (
Optional[bool]) β drop rows containing non affected and/or invalid values NB: invalid values are replaced by NA values and reported as invalid inanomaliesoutput slotkwds (
Any) β extra keyword args to be passed toModulesuperclass
- property parser: CSVStreamingReader#
When data contains invalid values pyarrow.csv.open_csv can raise an ArrowInvalid exception (even though no rows were fetched yet β¦) so one prefers a late instanciation of the CSVStreamingReader
- get_progress()#
Return a tuple of numbers (current,total) where current is current progress value and total is the total number of values to process; these values can change during the computations.
- recovering()#
transforms invalid values in NA values
- Return type:
- This module reads comma-separated values (csv) files progressively into a
Parquet Loader via PyArrow#
- class progressivis.io.parquet_loader.ParquetLoader#
- This module reads
parquetfiles progressively into aPTable using the
PyArrowbackend.
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
filenames (
PTable) β required: False π Files to read. The underlyingPTablemust have afilenamecolumn containing the file URIs. β οΈfilenamesslot andfilepath_or_bufferinit parameter cannot both be defined!- Output slots:
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, columns=None, **kwds)#
- Parameters:
filepath_or_buffer (
Optional[Any]) β string, path object or file-like object accepted bypyarrow.csv.open_csvfilter_ (
Optional[Callable[[RecordBatch],RecordBatch]]) β filtering function to be applied on input data at loading timeforce_valid_ids (
bool) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
Optional[Dict[str,Any]]) β the default values of the columns specified as a dictionary (seePTable)throttle (
Union[bool,int,float]) β limit the number of rows to be loaded in a stepread_options β Options for the CSV reader (see
pyarrow.csv.ReadOptions)parse_options β Options for the CSV parser (see
pyarrow.csv.ParseOptions)convert_options β Options for converting CSV data (see
pyarrow.csv.ConvertOptions)kwds (
Any) β extra keyword args to be passed topyarrow.parquet.ParquetFile,pyarrow.parquet.iter_batchesandModulesuperclass
- This module reads
Constant Module#
- class progressivis.table.constant.Constant#
Module providing a constant output
PTableslot
Variable Module#
- class progressivis.io.variable.Variable#
This module allows external events (comming from widgets, commands, etc.) to be taken into account
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Output slots:
result (
PDict) β π Returns the dictionary provided byfrom_input()after translation (if applicable)
- __init__(init_val=None, translation=None, **kwds)#
- Parameters:
init_val (
Optional[Dict[str,Any]]) β initial value for theresulttranslation (
Optional[Dict[str,Any]]) β sometimes dictionaries provided viafrom_input()could contain multiple aliases for the same canonical key (e.g.pickup_latitudeanddropoff_latitudeas aliases forlatitude) . In order to fix that,translationshould provide an alias-canonical key mapping dictionary (e.g.{'pickup_latitude': 'latitude', 'dropoff_latitude': 'latitude'})kwds (
Any) β extra keyword args to be passed toModulesuperclass
Data Filtering Modules#
Simple Filtering Module#
- class progressivis.table.simple_filter.SimpleFilter#
Filtering module based on a simple condition of the form
<column> <operator> <value>where<operator> := '>' | '>=' | '<' | '<='- Module parameters:
column (
object) β default: unknown , filtering column (conditionβs left operand)op (
object) β default: > , relational operator (i.e. β>β, β>=β, β<β, β<=β)value_key (
object) β default: ββ , seevalueinput belowquantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β π input table or viewvalue (
PDict) β π contains the conditionβs right operand.ifvalue_keyis provided the right operand isvalue['value_key']else the first value in the dict is usedhist (
PTable) β π BinningIndex module output connected to the same input/column.This mandatory parameter could be provided by the create_dependent_modules() method.
- Output slots:
result (
PTableSelectedView)
- __init__(**kwds)#
- Parameters:
kwds (
Any) β extra keyword args to be passed to theModulesuperclass
- create_dependent_modules(input_module, input_slot, hist_index=None, **kwds)#
Creates a default configuration.
- Parameters:
input_module (
Module) β the input module (see the example)input_slot (
str) β the input slot name (e.g.result)hist_index (
Optional[BinningIndex]) β optional histogram index. if not provided anBinningIndexis createdkwds (
Any) β extra keyword args to be passed to theModulesuperclass
- Return type:
CMP Query Module (via NumExpr)#
- class progressivis.table.cmp_query.CmpQueryLast#
Filtering module applying on rows of
tablea constraint based on all values of the last row ofcmpof the form :(table[:, col1] <op> last[col1]) <combine> (table[:, col2] <op> last[col2]) ... (table[:, colN] <op> last[colN])where:
lastis the last row of thecmptableand
<op> := '>' | '>=' | '<' | '<=' | '==' | '!='and
<combine> := 'and' | 'or' | 'xor'- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
- Output slots:
select (
PIntSet) β required: False π Selected indices
Numerical Expression Filtering Module (via NumExpr)#
- class progressivis.table.filtermod.FilterMod#
Filtering module based on
numexprlibrary.- Module parameters:
- Input slots:
table (
PTable) β π Data inputselection (
PIntSet) β required: False
- Output slots:
result (
PTableSelectedView) β π Returns a filterd view
Example:
from progressivis import Sink, Scheduler, RandomPTable
from progressivis.table.filtermod import FilterMod
scheduler = Scheduler()
with scheduler:
random = RandomPTable(2, rows=100000, scheduler=scheduler)
filter_ = FilterMod(scheduler=scheduler)
filter_.params.expr = "_1 > 0.5"
filter_.input.table = random.output.result
sink = Sink(scheduler=scheduler)
sink.input.inp = filter_.output.result
The underlying graph:
The Range Query Module#
- class progressivis.table.range_query.RangeQuery#
Selects rows that contain values within a provided range along a given axis (column)
- Module parameters:
column (
object) β default: unknown , The column in the table input slot concerned by the query. This parameter is mandatorywatched_key_lower (
object) β default: ββ , The key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_upper (
object) β default: ββ , The key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
lower (
PDict) β π Provides a PDict object containing the lower bound of the query. The key giving the bound is set by the watched_key_lower parameter when it is different from the column parameter.upper (
PDict) β π Provides a PDict object containing the upper bound of the query. The key giving the bound is set by the watched_key_upper parameter when it is different from the column parameter.min (
PDict) β π The minimum value in the input data. This mandatory parameter could be provided by the create_dependent_modules() method.max (
PDict) β π The maximum value in the input data. This mandatory parameter could be providedby the create_dependent_modules() method.timestamps (
PDict) β required: False π Gives information about bins changed between 2 run stepsindex (
PTable) β π BinningIndex module output connected to the same input/column.This mandatory parameter could be provided by the create_dependent_modules() method.
- Output slots:
result (
PTableSelectedView) β π Query main resultmin (
PDict) β required: False, attr_name: _min_table π min value of output datamax (
PDict) β required: False, attr_name: _max_table π max value of output data
- __init__(approximate=False, **kwds)#
- create_dependent_modules(input_module, input_slot, min_=None, max_=None, min_value=None, max_value=None, hist_index=None, **kwds)#
Creates a default configuration containing the necessary underlying modules. Beware, {min,max}_value=None is not the same as {min,max}_value=False. With None, a min module is created and connected. With False, it is not created and not connected.
- Return type:
Module RangeQuery is not self-sufficient. It needs other modules to work. A simple way to provide it with an environment that allows it to work properly is to use the create_dependent_modules() method.
This convenience method creates a set of modules connected to RangeQuery that produce the inputs required for its operation in most cases.
In the example below it is called with the default values:
from progressivis import Sink, Scheduler, RandomPTable, RangeQuery
scheduler = Scheduler()
with scheduler:
random = RandomPTable(2, rows=100000, scheduler=scheduler)
range_qry = RangeQuery(column="_1", scheduler=scheduler)
range_qry.create_dependent_modules(random, "result")
sink = Sink(scheduler=scheduler)
sink.input.inp = range_qry.output.result
sink.input.inp = range_qry.output.min
sink.input.inp = range_qry.output.max
And in this case it produces the following topology:
The Range Query 2D Module#
- class progressivis.table.range_query_2d.RangeQuery2D#
Selects rows that contain values within a provided 2D interval (two columns)
- Module parameters:
column_x (
object) β default: unknown , The x axis column in the table input slot concerned by the query. This parameter is mandatorycolumn_y (
object) β default: unknown , The y axis column in the table input slot concerned by the query. This parameter is mandatorywatched_key_lower_x (
object) β default: ββ , The x axis key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_upper_x (
object) β default: ββ , The x axis key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_lower_y (
object) β default: ββ , The y axis key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_upper_y (
object) β default: ββ , The y axis key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
lower (
PDict) β required: False π Provides a PDict object containing the 2D lower bound of the query. The x, y axis keys giving the bound are set by the watched_key_lower_{x|} parameters when they are different from the column_{x|y} parameters.upper (
PDict) β required: False π Provides a PDict object containing the 2D upper bound of the query. The x, y axis keys giving the bound are set by the watched_key_upper_{x|} parameters when they are different from the column_{x|y} parameters.min (
PDict) β π The minimum value in the input data. This mandatory parameter could be provided by the create_dependent_modules() method.max (
PDict) β π The maximum value in the input data. This mandatory parameter could be providedby the create_dependent_modules() method.timestamps_x (
PDict) β required: False π Gives information about bins changed between 2 run steps on the x axistimestamps_y (
PDict) β required: False π Gives information about bins changed between 2 run steps on the y axisindex (
PTable) β π BinningIndexND module output connected to the x filtering input/column.This mandatory parameter could be provided by the create_dependent_modules() method.
- Output slots:
result (
PTableSelectedView)min (
PDict) β required: False, attr_name: _min_table π min docmax (
PDict) β required: False, attr_name: _max_table
- __init__(approximate=False, **kwds)#
- create_dependent_modules(input_module, input_slot, min_=None, max_=None, min_value=None, max_value=None, **kwds)#
Creates a default configuration containing the necessary underlying modules. Beware, {min,max}_value=None is not the same as {min,max}_value=False. With None, a min module is created and connected. With False, it is not created and not connected.
- Return type:
Just like RangeQuery, the module RangeQuery2D is not self-sufficient. In order to provide it with an environment, the create_dependent_modules() method can be used in the same way:
from progressivis import Sink, Scheduler, RandomPTable, RangeQuery2D
scheduler = Scheduler()
with scheduler:
random = RandomPTable(2, rows=100000, scheduler=scheduler)
range_qry = RangeQuery2D(column_x="_1", column_y="_2", scheduler=scheduler)
range_qry.create_dependent_modules(random, "result")
sink = Sink(scheduler=scheduler)
sink.input.inp = range_qry.output.result
sink.input.inp = range_qry.output.min
sink.input.inp = range_qry.output.max
Categorical Query Module#
- class progressivis.table.categorical_query.CategoricalQuery#
Selects rows that contain values in a given column which are part of a provided subset It is convenient for categorical data.
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β π input datachoice (
PDict) β π provide the subset of categorical values to be queried
- Output slots:
result (
PTableSelectedView)
Like previous query modules CategoricalQuery is not self-sufficient. Use the create_dependent_modules() to initialize its environement:
from progressivis import Sink, Scheduler, CSVLoader, ConstDict, PDict
from progressivis.table.categorical_query import CategoricalQuery
scheduler = Scheduler()
with scheduler:
csv = CSVLoader("path/to/your/data.csv", scheduler=scheduler)
# NB: data.csv contains a categorical column named "category"
query = CategoricalQuery(column="category", scheduler=scheduler)
query.create_dependent_modules(input_module=csv)
ct = ConstDict(PDict({"only": ["A", "C"]}), scheduler=scheduler)
query.input.choice = ct.output.result
sink = Sink(scheduler=scheduler)
sink.input.inp = query.output.result
Indexing Modules#
Binning Index Module#
- class progressivis.table.binning_index.BinningIndex#
Compute and maintain a binned index by dividing the entire range of values of a column into a series of intervals (binning). Each bin contains the set of indices of rows in the underlying interval in order to provide fast access to these rows. Actually BinningIndex is able to build multiple indices, one for each column provided in the table slot hint.
- Module parameters:
tol (
float64) β default: -1 , Tolerance determining the bins width.Negative values represent %, positive values are absoluteinit_threshold (
int64) β default: 10000max_trials (
int64) β default: 5quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint- Output slots:
result (
PTableSelectedView)bin_timestamps (
PDict) β required: Falsemin_out (
PDict) β required: Falsemax_out (
PDict) β required: False
- __init__(column=None, **kwds)#
- get_quality()#
Return a dictionary associating names with quality values, or None when a quality does not make sense. The values should increase when the quality increases.
- query(operator_, limit, approximate=False, only_bins=PIntSet([]))#
Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)
- Return type:
- restricted_query(operator_, limit, only_locs, approximate=False, only_bins=PIntSet([]))#
Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)
- Return type:
- range_query(lower, upper, approximate=False)#
Return the list of rows with values in range [lower, upper[
- Return type:
- range_query_asgen(lower, upper, approximate=False)#
Return the list of rows with values in range [lower, upper[
Binning Index N-dim Module#
- class progressivis.table.binning_index_nd.BinningIndexND#
Compute and maintain a binned index by dividing the entire range of values of a column into a series of intervals (binning). Each bin contains the set of indices of rows in the underlying interval in order to provide fast access to these rows. Actually BinningIndexND is able to build multiple indices, one for each column provided in the table slot hint.
- Module parameters:
tol (
float64) β default: -1 , Tolerance determining the bins width.Negative values represent %, positive values are absoluteinit_threshold (
int64) β default: 10000max_trials (
int64) β default: 5quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint- Output slots:
result (
PTableSelectedView)bin_timestamps_0 (
PDict) β required: Falsebin_timestamps_1 (
PDict) β required: Falsemin_out (
PDict) β required: Falsemax_out (
PDict) β required: False
- __init__(**kwds)#
- query(column, operator_, limit, approximate=False, only_bins=PIntSet([]))#
Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)
- Return type:
- restricted_query(column, operator_, limit, only_locs, approximate=False, only_bins=PIntSet([]))#
Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)
- Return type:
- range_query(column, lower, upper, approximate=False)#
Return the list of rows with values in range [lower, upper[
- Return type:
- range_query_asgen(column, lower, upper, approximate=False)#
Return the list of rows with values in range [lower, upper[
- restricted_range_query(column, lower, upper, only_locs, approximate=False, only_bins=PIntSet([]))#
Return the list of rows with values in range [lower, upper[ among only_locs
- Return type:
Unique Index Module#
Data Grouping/Joining/Aggregation Modules#
Join-by-id Module#
Join Module#
- class progressivis.table.join.Join#
{many|one}-to-one join module
- Parameters:
primary_on β column or list of columns giving the primary key on the primary table
related_on β column or list of columns giving the foreign key on the related table
on β shortcut when primary_on and related_on are identical
how (
Union[Literal['inner'],Literal['outer']]) β {inner|outer} NB: outer provides only outer rows on related tablekwds (
Any) β argument to pass to the join function
- __init__(*, how='inner', fillna=None, inv_mask=None, **kwds)#
- create_dependent_modules(primary_module, related_module, *, primary_slot='result', related_slot='result', primary_cols=None, related_cols=None, on=None, primary_on=None, related_on=None, suffix='')#
- Parameters:
primary_module (
Module) β module providing the primary data source (primary key owner)related_module (
Module) β module providing the related data source (foreign key owner)primary_slot (
str) ββ¦
related_slot (
str) ββ¦
primary_cols (
Optional[List[str]]) β primary table (virtual) columns to be included in the output viewrelated_cols (
Optional[List[str]]) β related table columns to be included in the output viewprimary_on (
Union[str,List[str],None]) β column or list of columns giving the primary key on the primary tablerelated_on (
Union[str,List[str],None]) β column or list of columns giving the foreign key on the related tableon (
Union[str,List[str],None]) β shortcut when primary_on and related_on are identicalsuffix (
str) ββ¦
- Return type:
from progressivis import Sink, Scheduler, ParquetLoader, CSVLoader, Join
PARQUET_FILE = "path/to/yellow_tripdata_2015-01.parquet"
CSV_URL = "path/to/taxi+_zone_lookup.csv"
scheduler = Scheduler()
with scheduler:
parquet = ParquetLoader(PARQUET_FILE, scheduler=scheduler)
csv = CSVLoader(CSV_URL, scheduler=scheduler)
join = Join(how="inner", scheduler=scheduler)
join.create_dependent_modules(
related_module=parquet,
primary_module=csv,
related_on=["DOLocationID"],
primary_on=["LocationID"]
)
sink = Sink(scheduler=scheduler)
sink.input.inp = join.output.result
Group-By Module#
Aggregate Module#
Set and Flow Control Operations#
Intersection Module#
- class progressivis.table.intersection.Intersection#
Intersection Module It computes the intersection of indices for all its inputs and provides a view containing rows shared by all input tables or views. β οΈ All inputs are based of the same physical table. The columns of the output table are given by the common physical table
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
BasePTable) β multiple: True π Many tables or views sharing the same physical table- Output slots:
result (
PTableSelectedView) β π View on the physical table shared by inputs. Itβs index is the intersection on inputs indices
Merging dictionaries Module#
- class progressivis.table.merge_dict.MergeDict#
Gathers many (dict) outputs in one. Assume there is no clash. Complementary to
Switchmodule- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PDict) β multiple: True π Multiple inputs providing dictionaries- Output slots:
result (
PDict) β π Merged dictionary
Switching Module#
- class progressivis.table.switch.Switch#
Switch the input to
resultorresult_elseoutput at runtime, based oncondition- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β π Data input- Output slots:
result (
PTableSelectedView) β π replays the input if theconditionis satisfiedresult_else (
PTableSelectedView) β π replays the input if the condition fails
- __init__(condition, **kwds)#
Statistical Modules#
Histograms#
- class progressivis.stats.histogram1d.Histogram1D#
Compute the histogram of a scalar, numerical column in the input table
- Module parameters:
bins (
int64) β default: 128 , the number of equal-width bins in the range given by the min/max inputsintervaldelta (
float64) β default: -5 , tolerance threshold for variations in the min/max values at which the bounds are changed. Negative values represent %, positive values are absolutequantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint π the input tablemin (
PDict) β π The minimum value in the input data. It could be provided by aMinmodulemax (
PDict) β π The maximum value in the input data. It could be provided by aMaxmodule
- Output slots:
result (
PDict) β datashape: {βarrayβ: <class βnumpy.ndarrayβ>, βminβ: <class βfloatβ>, βmaxβ: <class βfloatβ>, βtimeβ: <class βintβ>} π the output table. Its datashape is{ array: var * int32, min: float64, max: float64, time: int64 }
- __init__(column=None, **kwds)#
- get_quality()#
Return a dictionary associating names with quality values, or None when a quality does not make sense. The values should increase when the quality increases.
- class progressivis.stats.histogram2d.Histogram2D#
Compute the 2D histogram of two scalar, numerical columns in the input table. These two columns are referred to as
x_columnandy_columnhere.- Module parameters:
xbins (
int64) β default: 64 , the number ofbins(as defined forHistogram1D) over thexaxisybins (
int64) β default: 64 , the number ofbins(as defined forHistogram1D) over theyaxisxdelta (
float64) β default: -5 , thedeltathreshold (as defined forHistogram1D) over thexaxisydelta (
float64) β default: -5 , thedeltathreshold (as defined forHistogram1D) over theyaxishistory (
int64) β default: 3quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsdict[str, str]as hint π the input tablemin (
PDict) β π The minimum value in the input data. It could be provided by aMinmodulemax (
PDict) β π The maximum value in the input data. It could be provided by aMaxmodule
- Output slots:
result (
PTable) β datashape: {βarrayβ: <class βnumpy.ndarrayβ>, βcminβ: <class βfloatβ>, βcmaxβ: <class βfloatβ>, βxminβ: <class βfloatβ>, βxmaxβ: <class βfloatβ>, βyminβ: <class βfloatβ>, βymaxβ: <class βfloatβ>, βtimeβ: <class βintβ>} π the output table
- __init__(x_column='', y_column='', with_output=True, **kwds)#
- get_quality()#
Return a dictionary associating names with quality values, or None when a quality does not make sense. The values should increase when the quality increases.
- class progressivis.stats.histogram1d_categorical.Histogram1DCategorical#
Compute the histogram (i.e. the bar chart) of a categorical column in the input table
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β π the input table- Output slots:
result (
PDict) β π occurence counters dictionary where every key represents a categorical value
- __init__(column, **kwds)#
Max / IdxMax#
- class progressivis.stats.max.Max#
Computes the maximum of the values for every column of an input table.
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint π Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed- Output slots:
result (
PDict) β π maximum values dictionary where every key represents a column
- class progressivis.stats.idxmax.IdxMax#
Computes the indices of the maximum of the values for every column
Min / IdxMin#
- class progressivis.stats.min.Min#
Computes the minimum of the values for every column
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint π Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed- Output slots:
result (
PDict) β π minimum values dictionary where every key represents a column
- class progressivis.stats.idxmin.IdxMin#
- Module parameters:
history (
int64) β default: 33 , then number of successive results to be keptquantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint π the input table- Output slots:
- __init__(**kwds)#
Random sampling#
- class progressivis.stats.random_table.RandomPTable#
Random table generator module
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Output slots:
result (
PTable) β π result table
- __init__(columns, rows=-1, random=<bound method RandomState.rand of RandomState(MT19937)>, dtype='float64', throttle=False, **kwds)#
- Parameters:
columns (
Union[int,Sequence[str]]) βcolumns definition:
if it is an
intit provides the number (n) of columns named_1β¦_nelse it provides sequence of column names
rows (
int) β if positive (= n) stops generation afternrows else unboundedrandom (
Callable[...,ndarray[Any,Any]]) βnumpyrandom function, default isnumpy.random.randdtype (
str) βnumpyalike data typethrottle (
Union[int,integer[Any],float,bool]) β limit the number of rows to be generated in a stepkwds (
Any) β extra keyword args to be passed to theModulesuperclass
- class progressivis.stats.blobs_table.BlobsPTable#
Isotropic Gaussian blobs table generator based on
sklearn.datasets.make_blobsfunction- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Output slots:
- __init__(columns, rows=-1, dtype='float64', seed=0, throttle=False, *, centers, **kwds)#
- Parameters:
columns (
Union[int,List[str],ndarray[Any,Any]]) βcolumns definition:
if it is an
intit provides the number (n) of columns named_1β¦_nelse it provides sequence of column names
rows (
int) β if positive (= n) stops generation afternrows else unboundeddtype (
str) βnumpyalike data typeseed (
int) β used to initialize the random number generatorthrottle (
Union[int,bool,float]) β limit the number of rows to be generated in a stepcenters (
Any) β number of centers to generate, or the fixed center locationskwds (
Any) β : extra keyword args to be passed tosklearn.datasets.make_blobs
- class progressivis.stats.blobs_table.MVBlobsPTable#
The multivariate normal distribution blobs table generator based on
numpy.random.multivariate_normalfunction- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Output slots:
- __init__(columns, rows=-1, dtype='float64', seed=0, throttle=False, *, means, covs, **kwds)#
- Parameters:
columns (
Union[int,List[str],ndarray[Any,Any]]) βcolumns definition:
if it is an
intit provides the number (n) of columns named_1β¦_nelse it provides sequence of column names
rows (
int) β if positive (= n) stops generation afternrows else unboundeddtype (
str) βnumpyalike data typeseed (
int) β used to initialize the random number generatorthrottle (
Union[int,bool,float]) β limit the number of rows to be generated in a stepmeans (
Any) β sequence of 1-D array_like of length N for N-dimensional blobscovs (
Any) β sequence of 2-D array_like, of shape (N, N) representing covariance matrix of the distribution for N-dimensional blobskwds (
Any) β extra keyword args to be passed tonumpy.random.multivariate_normal
Stats#
- class progressivis.stats.stats.Stats#
Computes the minimum and the maximum of the values for a single column
- Module parameters:
history (
int64) β default: 3 , then number of successive results to be keptquantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β π the input table- Output slots:
result (
PTable) β π result table containing two columns (for min and max value)
- __init__(column, min_column=None, max_column=None, reset_index=False, **kwds)#
- Parameters:
column (
Union[str,int]) β the name or the position of the column to be processedmin_column (
Optional[str]) β the name of the minimum column in theresulttable. When missing, the given name is_<column>_minman_column β the name of the maximum column in the
resulttable. When missing, the given name is_<column>_maxkwds (
Any) β extra keyword args to be passed to theModulesuperclass
Sample#
- class progressivis.stats.sample.Sample#
Reservoir Sampling module
- Module parameters:
samples (
int64) β default: 50quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable)- Output slots:
result (
PTableSelectedView)select (
PIntSet) β required: False, attr_name: pintset, custom_attr: True
Variance#
- class progressivis.stats.var.Var#
Computes the variance for every columns of an input table.
- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint π the input table- Output slots:
result (
PDict) β π variances dictionary where every key represents a column
- __init__(ignore_string_cols=False, **kwds)#
- class progressivis.stats.var.VarH#
Compute the variance of the columns of an input table. This variant keeps history
Clustering Modules#
Linear Algebra Modules#
Element-wise processing modules#
These modules apply numpy universal functions (a.k.a. ufunc) to the column of one or two input tables.
Depending on the applied ufunc arity, we distinguish several categories of modules.
Unary modules#
These modules apply an unary ufunc on all columns or a subset of columns from the input table, for example:
- class progressivis.linalg.Absolute#
Applies
numpy.absoluteover all input columns or over a subset- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint π Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed- Output slots:
result (
PTable) β required: False, datashape: {βtableβ: β#columnsβ} π The output table follows the structure oftable
The other unary modules have the same interface as Absolute module above. They are:
Module name |
underlying Universal Function |
|---|---|
Absolute |
|
Arccos |
|
Arccosh |
|
Arcsin |
|
Arcsinh |
|
Arctan |
|
Arctanh |
|
Cbrt |
|
Ceil |
|
Conjugate |
|
Cos |
|
Cosh |
|
Deg2rad |
|
Degrees |
|
Exp |
|
Exp2 |
|
Expm1 |
|
Fabs |
|
Floor |
|
Frexp |
|
Invert |
|
Isfinite |
|
Isinf |
|
Isnan |
|
Isnat |
|
Log |
|
Log10 |
|
Log1p |
|
Log2 |
|
LogicalNot |
|
Modf |
|
Negative |
|
Positive |
|
Rad2deg |
|
Radians |
|
Reciprocal |
|
Rint |
|
Sign |
|
Signbit |
|
Sin |
|
Sinh |
|
Spacing |
|
Sqrt |
|
Square |
|
Tan |
|
Tanh |
|
Trunc |
Binary modules#
These modules apply a binary ufunc on two sets of columns belonging to same input table (for example ColsAdd below) or to two distinct tables (for example Add below):
- class progressivis.linalg.Add#
Applies
numpy.addover two sets of columns. One of them belongs to thefirstinput table and the other belongs to thesecond
Examples:
from progressivis import Sink, Scheduler, RandomPTable, Add
#
# every column in random1 are added to the column in random2 in the same position
#
scheduler = Scheduler()
with scheduler:
random1 = RandomPTable(3, rows=100_000, scheduler=scheduler)
random2 = RandomPTable(3, rows=100_000, scheduler=scheduler)
module = Add(scheduler=scheduler)
module.input.first = random1.output.result
module.input.second = random2.output.result
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result
#
# columns _3, _5, _7 in random1 are added to column _4, _6, _8 in random2
#
scheduler = Scheduler()
with scheduler:
random1 = RandomPTable(3, rows=100_000, scheduler=scheduler)
random2 = RandomPTable(3, rows=100_000, scheduler=scheduler)
module = Add(scheduler=scheduler)
module.input.first = random1.output.result["_3", "_5", "_7"]
module.input.second = random2.output.result["_4", "_6", "_8"]
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result
- class progressivis.linalg.ColsAdd#
Example:
from progressivis import Sink, Scheduler, RandomPTable
from progressivis.linalg import ColsAdd
#
# columns _3, _5, _7 are added to column _4, _6, _8
#
scheduler = Scheduler()
with scheduler:
random = RandomPTable(3, rows=100_000, scheduler=scheduler)
module = ColsAdd(
cols_out=["x", "y", "z"],
scheduler=scheduler,
)
module.input.table = random.output.result[["_3", "_5", "_7"], ["_4", "_6", "_8"]]
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result
The other binary modules have the same interface as Add and ColsAdd modules above. They are:
Module name |
underlying Universal Function |
|---|---|
Add / ColsAdd / AddReduce |
|
Arctan2 / ColsArctan2 / Arctan2Reduce |
|
BitwiseAnd / ColsBitwiseAnd / BitwiseAndReduce |
|
BitwiseOr / ColsBitwiseOr / BitwiseOrReduce |
|
BitwiseXor / ColsBitwiseXor / BitwiseXorReduce |
|
Copysign / ColsCopysign / CopysignReduce |
|
Divide / ColsDivide / DivideReduce |
|
Divmod / ColsDivmod / DivmodReduce |
|
Equal / ColsEqual / EqualReduce |
|
FloatPower / ColsFloatPower / FloatPowerReduce |
|
FloorDivide / ColsFloorDivide / FloorDivideReduce |
|
Fmax / ColsFmax / FmaxReduce |
|
Fmin / ColsFmin / FminReduce |
|
Fmod / ColsFmod / FmodReduce |
|
Gcd / ColsGcd / GcdReduce |
|
Greater / ColsGreater / GreaterReduce |
|
GreaterEqual / ColsGreaterEqual / GreaterEqualReduce |
|
Heaviside / ColsHeaviside / HeavisideReduce |
|
Hypot / ColsHypot / HypotReduce |
|
Lcm / ColsLcm / LcmReduce |
|
Ldexp / ColsLdexp / LdexpReduce |
|
LeftShift / ColsLeftShift / LeftShiftReduce |
|
Less / ColsLess / LessReduce |
|
LessEqual / ColsLessEqual / LessEqualReduce |
|
Logaddexp / ColsLogaddexp / LogaddexpReduce |
|
Logaddexp2 / ColsLogaddexp2 / Logaddexp2Reduce |
|
LogicalAnd / ColsLogicalAnd / LogicalAndReduce |
|
LogicalOr / ColsLogicalOr / LogicalOrReduce |
|
LogicalXor / ColsLogicalXor / LogicalXorReduce |
|
Maximum / ColsMaximum / MaximumReduce |
|
Minimum / ColsMinimum / MinimumReduce |
|
Mod / ColsMod / ModReduce |
|
Multiply / ColsMultiply / MultiplyReduce |
|
Nextafter / ColsNextafter / NextafterReduce |
|
NotEqual / ColsNotEqual / NotEqualReduce |
|
Power / ColsPower / PowerReduce |
|
Remainder / ColsRemainder / RemainderReduce |
|
RightShift / ColsRightShift / RightShiftReduce |
|
Subtract / ColsSubtract / SubtractReduce |
|
TrueDivide / ColsTrueDivide / TrueDivideReduce |
Reduce modules#
These modules reduce input table columns dimensions by one, by applying an ufunc, for example:
- class progressivis.linalg.AddReduce#
Applies
numpy.add.reduceover all input columns or over a subset of them- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
table (
PTable) β acceptsSequence[str]as hint π Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed- Output slots:
result (
PDict) β π The keyβs names follow the input table columns
The other reduce modules have the same interface as AddReduce modules above. They are listed in the previous table.
Decorators#
One can transform a simple function into an element-wise module via three decorators @unary_module @binary_module and @reduce_module:
Examples:
import numpy as np
from progressivis.linalg import unary_module, binary_module, reduce_module
from typing import cast
@unary_module
def CustomUnary(x: float) -> float:
return cast(float, (x + np.sin(x)) / (x + np.cos(x)))
@binary_module
def CustomBinary(x: float, y: float) -> float:
return cast(float, (x + np.sin(y)) / (x + np.cos(y)))
@reduce_module
def CustomBinaryReduce(x: float, y: float) -> float:
return cast(float, (x + np.sin(y)) / (x + np.cos(y)))
Declarative modules#
One can create new modules in a declarative way as subclasses of linalg.mixufunc.MixUfuncABC.
Examples:
import numpy as np
from progressivis.linalg.mixufunc import MixUfuncABC
from progressivis import PTable, def_input, def_output
from typing import Any
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False, datashape={"first": ["_1", "_2"]})
class MixUfuncSample(MixUfuncABC):
"""
Explanation: the result table has two columns "_1" and "_2" which are calculated
with the underlying expressions
NB: Here, columns in first and second table are supposed to be _1, _2, ...
"""
expr = {"_1": (np.add, "first._2", "second._3"), "_2": (np.log, "second._3")}
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncSample2(MixUfuncABC):
"""
The output types can be coerced if necessary
"""
expr = {
"_1:float64": (np.add, "first._2", "second._3"),
"_2:float64": (np.log, "second._3"),
}
def custom_unary(x: float) -> float:
return (x + np.sin(x)) / (x + np.cos(x)) # type: ignore
custom_unary_ufunc: Any = np.frompyfunc(custom_unary, 1, 1)
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncCustomUnary(MixUfuncABC):
"""
Module using a custom unary function
"""
expr = {
"_1:float64": (np.add, "first._2", "second._3"),
"_2:float64": (custom_unary_ufunc, "second._3"),
}
def custom_binary(x: float, y: float) -> float:
return (x + np.sin(y)) / (x + np.cos(y)) # type: ignore
custom_binary_ufunc: Any = np.frompyfunc(custom_binary, 2, 1)
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncCustomBinary(MixUfuncABC):
"""
Module using a custom unary function
"""
expr = {
"_1:float64": (custom_binary_ufunc, "first._2", "second._3"),
"_2:float64": (np.log, "second._3"),
}
Declarative module based on numexpr expressions can be created using NumExprABC this way:
from progressivis.linalg.nexpr import NumExprABC
from progressivis import PTable, def_input, def_output
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False, datashape={"first": ["_1", "_2"]})
class NumExprSample(NumExprABC):
"""
NB: Here, columns in first and second table are supposed to be _1, _2, ...
"""
expr = {"_1": "{first._2}+2*{second._3}", "_2": "{first._3}-5*{second._2}"}
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class NumExprSample2(NumExprABC):
"""
The output types can be coerced if necessary
"""
expr = {
"_1:float64": "{first._2}+2*{second._3}",
"_2:float64": "{first._3}-5*{second._2}",
}
Linear mapping#
Linear transformation can be performed via this module:
- class progressivis.linalg.linear_map.LinearMap#
Performs a linear transformation on rows in
vectorstable. Thetransformationtable (or a subset of its columns) provides the tranformation matrix once all their rows are read. Its rows number has to be equal tovectorscolumns number.- Module parameters:
quantum (
float64) β default: 0.5debug (
bool) β default: False
- Input slots:
vectors (
PTable) β acceptsSequence[str]as hint π Table providing the row vectors.A selection of columnscould be provided via a hint (see example). When no hint is provided all input columns are usertransformation (
PTable) β acceptsSequence[str]as hint π Table providing the transformation matrix.A selection of columnscould be provided via a hint (same behaviour asvectors)
- Output slots:
result (
PTable) β π Result table, follows the structure oftransformation
Example:
from progressivis import Sink, Scheduler, RandomPTable
from progressivis.linalg.linear_map import LinearMap
scheduler = Scheduler()
with scheduler:
vectors = RandomPTable(20, rows=100000, scheduler=scheduler)
transf = RandomPTable(20, rows=3, scheduler=scheduler)
module = LinearMap(scheduler=scheduler)
module.input.vectors = vectors.output.result["_3", "_4", "_5"]
module.input.transformation = transf.output.result["_4", "_5", "_6", "_7"]
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result