Module Library#
I/O Modules#
Simple CSV Loader#
- class progressivis.io.simple_csv_loader.SimpleCSVLoader#
This module reads comma-separated values (csv) files progressively into a
PTable
. Optionally, it provides information about missing values and anomalies via anomalies and missing output slots. Internally it usespandas.read_csv
.- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
filenames (
PTable
) β required: False π Files to read. The underlyingPTable
must have afilename
column containing the file URIs. β οΈfilenames
slot andfilepath_or_buffer
init parameter cannot both be defined!- Output slots:
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, imputer=None, **kwds)#
- Parameters:
filepath_or_buffer (
Optional
[Any
]) β str, path object or file-like object accepted bypandas.read_csv
filter_ (
Optional
[Callable
[[DataFrame
],DataFrame
]]) βfiltering function to be applied on input data at loading time
Example
>>> def filter_(df): ... lon = df['dropoff_longitude'] ... lat = df['dropoff_latitude'] ... return df[(lon>-74.10)&(lon<-73.7)&(lat>40.60)&(lat<41)]
force_valid_ids (
bool
) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
Optional
[Dict
[str
,Any
]]) β the default values of the columns specified as a dictionary (seePTable
)throttle (
Union
[bool
,int
,float
]) β limit the number of rows to be loaded in a stepimputer (
Optional
[SimpleImputer
]) β aSimpleImputer
provides basic strategies for imputing missing valueskwds (
Any
) β extra keyword args to be passed topandas.read_csv
andModule
superclass
Reliable CSV Loader#
- class progressivis.io.csv_loader.CSVLoader#
This module reads comma-separated values (csv) files progressively into a
PTable
. It is able to resume the reading after a network failure or a crash. β οΈ this module do not wait forfilenames
!- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
filenames (
PTable
) β required: False π Files to read. The underlyingPTable
must have afilename
column containing the file URIs. β οΈfilenames
slot andfilepath_or_buffer
init parameter cannot both be defined!- Output slots:
result (
PTable
) β π Provides read data into aPTable
object
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, as_array=None, timeout=None, save_context=True, recovery=False, recovery_tag='', recovery_table_size=3, save_step_size=100000, **kwds)#
- Parameters:
filepath_or_buffer (
Optional
[Any
]) βstr, path object or file-like object accepted by
pandas.read_csv
NB:
filenames
slot and filepath_or_buffer init parameter cannot both be definedfilter_ (
Optional
[Callable
[[DataFrame
],DataFrame
]]) βfiltering function to be applied on input data at loading time
Example
>>> def filter_(df): ... lon = df['dropoff_longitude'] ... lat = df['dropoff_latitude'] ... return df[(lon>-74.10)&(lon<-73.7)&(lat>40.60)&(lat<41)]
force_valid_ids (
bool
) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
Optional
[Dict
[str
,Any
]]) β the default values of the columns specified as a dictionary (seePTable
)as_array (
Union
[str
,Dict
[str
,List
[str
]],Callable
[[List
[str
]],Dict
[str
,List
[str
]]],None
]) βallows lists of n input columns to be grouped into output n-D columns (grouped column types must be identical):
when as_array is a
Dict[str, List[str]]
every entry represents a grouping rule- Example:
>>> CSVLoader( ... get_dataset("bigfile"), ... index_col=False, ... as_array={ ... "first_group": ["_1", "_2", "_3"], ... "second_group": ["_11", "_12", "_13"], ... }, ... header=None, ... scheduler=s, ... )
when as_array is a
str
it gives the name of an unique n-D output column which groups all input columnswhen as_array is a
Callable
it must be able to process the list of all input columns and return aDict[str, List[str]]
- Example:
>>> CSVLoader( ... get_dataset("mnist_784"), ... index_col=False, ... as_array=lambda cols: {"array": [ ... c for c in cols if c != "class"]}, ... scheduler=s, ... )
timeout (
Optional
[float
]) β stop waiting for a response and raise an exception after a given number of secondssave_context (
bool
) β- saving information allowing recovery after a failure if
True
and iffilepath_or_buffer
designates a recoverable support (i.e. a filepath)
NB: currently backup only works when data input is provided via
filepath_or_buffer
- saving information allowing recovery after a failure if
recovery (
bool
) β whenFalse
(default) the data entry is read from the beginning otherwise the previous reading is resumed using the save informationrecovery_tag (
Union
[str
,int
]) β customize save tags (useful mainly for debugging)recovery_table_size (
int
) β defines the size of the recovery table (useful mainly for debugging),save_step_size (
int
) β defines de number of rows to read between two context snapshots,kwds (
Any
) β extra keyword args to be passed topandas.read_csv
andModule
superclass
CSV Loader via PyArrow#
- class progressivis.io.pa_csv_loader.PACSVLoader#
- This module reads comma-separated values (csv) files progressively into a
PTable
using the
PyArrow
backend.
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
filenames (
PTable
) β required: False π Files to read. The underlyingPTable
must have afilename
column containing the file URIs. β οΈfilenames
slot andfilepath_or_buffer
init parameter cannot both be defined!- Output slots:
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, read_options=None, parse_options=None, convert_options=None, drop_na=True, **kwds)#
- Parameters:
filepath_or_buffer (
Optional
[Any
]) β string, path object or file-like object accepted bypyarrow.csv.open_csv
filter_ (
Optional
[Callable
[[RecordBatch
],RecordBatch
]]) β filtering function to be applied on input data at loading timeforce_valid_ids (
bool
) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
Optional
[Dict
[str
,Any
]]) β the default values of the columns specified as a dictionary (seePTable
)throttle (
Union
[bool
,int
,float
]) β limit the number of rows to be loaded in a stepread_options (
Optional
[ReadOptions
]) β Options for the CSV reader (seepyarrow.csv.ReadOptions
)parse_options (
Optional
[ParseOptions
]) β Options for the CSV parser (seepyarrow.csv.ParseOptions
)convert_options (
Optional
[ConvertOptions
]) β Options for converting CSV data (seepyarrow.csv.ConvertOptions
)drop_na (
Optional
[bool
]) β drop rows containing non affected and/or invalid values NB: invalid values are replaced by NA values and reported as invalid inanomalies
output slotkwds (
Any
) β extra keyword args to be passed toModule
superclass
- property parser: CSVStreamingReader#
When data contains invalid values pyarrow.csv.open_csv can raise an ArrowInvalid exception (even though no rows were fetched yet β¦) so one prefers a late instanciation of the CSVStreamingReader
- get_progress()#
Return a tuple of numbers (current,total) where current is current progress value and total is the total number of values to process; these values can change during the computations.
- recovering()#
transforms invalid values in NA values
- Return type:
- This module reads comma-separated values (csv) files progressively into a
Parquet Loader via PyArrow#
- class progressivis.io.parquet_loader.ParquetLoader#
- This module reads
parquet
files progressively into aPTable
using the
PyArrow
backend.
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
filenames (
PTable
) β required: False π Files to read. The underlyingPTable
must have afilename
column containing the file URIs. β οΈfilenames
slot andfilepath_or_buffer
init parameter cannot both be defined!- Output slots:
- __init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, columns=None, **kwds)#
- Parameters:
filepath_or_buffer (
Optional
[Any
]) β string, path object or file-like object accepted bypyarrow.csv.open_csv
filter_ (
Optional
[Callable
[[RecordBatch
],RecordBatch
]]) β filtering function to be applied on input data at loading timeforce_valid_ids (
bool
) β force renaming of columns to make their names valid identifiers according to the language definitionfillvalues (
Optional
[Dict
[str
,Any
]]) β the default values of the columns specified as a dictionary (seePTable
)throttle (
Union
[bool
,int
,float
]) β limit the number of rows to be loaded in a stepread_options β Options for the CSV reader (see
pyarrow.csv.ReadOptions
)parse_options β Options for the CSV parser (see
pyarrow.csv.ParseOptions
)convert_options β Options for converting CSV data (see
pyarrow.csv.ConvertOptions
)kwds (
Any
) β extra keyword args to be passed topyarrow.parquet.ParquetFile
,pyarrow.parquet.iter_batches
andModule
superclass
- This module reads
Constant Module#
- class progressivis.table.constant.Constant#
Module providing a constant output
PTable
slot
Variable Module#
- class progressivis.io.variable.Variable#
This module allows external events (comming from widgets, commands, etc.) to be taken into account
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Output slots:
result (
PDict
) β π Returns the dictionary provided byfrom_input()
after translation (if applicable)
- __init__(init_val=None, translation=None, **kwds)#
- Parameters:
init_val (
Optional
[Dict
[str
,Any
]]) β initial value for theresult
translation (
Optional
[Dict
[str
,Any
]]) β sometimes dictionaries provided viafrom_input()
could contain multiple aliases for the same canonical key (e.g.pickup_latitude
anddropoff_latitude
as aliases forlatitude
) . In order to fix that,translation
should provide an alias-canonical key mapping dictionary (e.g.{'pickup_latitude': 'latitude', 'dropoff_latitude': 'latitude'}
)kwds (
Any
) β extra keyword args to be passed toModule
superclass
Data Filtering Modules#
Simple Filtering Module#
- class progressivis.table.simple_filter.SimpleFilter#
Filtering module based on a simple condition of the form
<column> <operator> <value>
where<operator> := '>' | '>=' | '<' | '<='
- Module parameters:
column (
object
) β default: unknown , filtering column (conditionβs left operand)op (
object
) β default: > , relational operator (i.e. β>β, β>=β, β<β, β<=β)value_key (
object
) β default: ββ , seevalue
input belowquantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β π input table or viewvalue (
PDict
) β π contains the conditionβs right operand.ifvalue_key
is provided the right operand isvalue['value_key']
else the first value in the dict is usedhist (
PTable
) β π BinningIndex module output connected to the same input/column.This mandatory parameter could be provided by the create_dependent_modules() method.
- Output slots:
result (
PTableSelectedView
)
- __init__(**kwds)#
- Parameters:
kwds (
Any
) β extra keyword args to be passed to theModule
superclass
- create_dependent_modules(input_module, input_slot, hist_index=None, **kwds)#
Creates a default configuration.
- Parameters:
- Return type:
CMP Query Module (via NumExpr)#
- class progressivis.table.cmp_query.CmpQueryLast#
Filtering module applying on rows of
table
a constraint based on all values of the last row ofcmp
of the form :(table[:, col1] <op> last[col1]) <combine> (table[:, col2] <op> last[col2]) ... (table[:, colN] <op> last[colN])
where:
last
is the last row of thecmp
tableand
<op> := '>' | '>=' | '<' | '<=' | '==' | '!='
and
<combine> := 'and' | 'or' | 'xor'
Numerical Expression Filtering Module (via NumExpr)#
- class progressivis.table.filtermod.FilterMod#
Filtering module based on
numexpr
library.- Module parameters:
- Input slots:
table (
PTable
) β π Data input- Output slots:
result (
PTableSelectedView
) β π Returns a filterd view
Example:
from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.table.filtermod import FilterMod
scheduler = Scheduler()
with scheduler:
random = RandomPTable(2, rows=100000, scheduler=scheduler)
filter_ = FilterMod(scheduler=scheduler)
filter_.params.expr = "_1 > 0.5"
filter_.input.table = random.output.result
sink = Sink(scheduler=scheduler)
sink.input.inp = filter_.output.result
The underlying graph:
The Range Query Module#
- class progressivis.table.range_query.RangeQuery#
Selects rows that contain values within a provided range along a given axis (column)
- Module parameters:
column (
object
) β default: unknown , The column in the table input slot concerned by the query. This parameter is mandatorywatched_key_lower (
object
) β default: ββ , The key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_upper (
object
) β default: ββ , The key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
lower (
PDict
) β π Provides a PDict object containing the lower bound of the query. The key giving the bound is set by the watched_key_lower parameter when it is different from the column parameter.upper (
PDict
) β π Provides a PDict object containing the upper bound of the query. The key giving the bound is set by the watched_key_upper parameter when it is different from the column parameter.min (
PDict
) β π The minimum value in the input data. This mandatory parameter could be provided by the create_dependent_modules() method.max (
PDict
) β π The maximum value in the input data. This mandatory parameter could be providedby the create_dependent_modules() method.timestamps (
PDict
) β required: False π Gives information about bins changed between 2 run stepsindex (
PTable
) β π BinningIndex module output connected to the same input/column.This mandatory parameter could be provided by the create_dependent_modules() method.
- Output slots:
result (
PTableSelectedView
) β π Query main resultmin (
PDict
) β required: False, attr_name: _min_table π min value of output datamax (
PDict
) β required: False, attr_name: _max_table π max value of output data
- __init__(approximate=False, **kwds)#
- create_dependent_modules(input_module, input_slot, min_=None, max_=None, min_value=None, max_value=None, hist_index=None, **kwds)#
Creates a default configuration containing the necessary underlying modules. Beware, {min,max}_value=None is not the same as {min,max}_value=False. With None, a min module is created and connected. With False, it is not created and not connected.
- Return type:
Module RangeQuery
is not self-sufficient. It needs other modules to work. A simple way to provide it with an environment that allows it to work properly is to use the create_dependent_modules()
method.
This convenience method creates a set of modules connected to RangeQuery
that produce the inputs required for its operation in most cases.
In the example below it is called with the default values:
from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.table.range_query import RangeQuery
scheduler = Scheduler()
with scheduler:
random = RandomPTable(2, rows=100000, scheduler=scheduler)
range_qry = RangeQuery(column="_1", scheduler=scheduler)
range_qry.create_dependent_modules(random, "result")
sink = Sink(scheduler=scheduler)
sink.input.inp = range_qry.output.result
sink.input.inp = range_qry.output.min
sink.input.inp = range_qry.output.max
And in this case it produces the following topology:
The Range Query 2D Module#
- class progressivis.table.range_query_2d.RangeQuery2d#
Selects rows that contain values within a provided 2D interval (two columns)
- Module parameters:
column_x (
object
) β default: unknown , The x axis column in the table input slot concerned by the query. This parameter is mandatorycolumn_y (
object
) β default: unknown , The y axis column in the table input slot concerned by the query. This parameter is mandatorywatched_key_lower_x (
object
) β default: ββ , The x axis key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_upper_x (
object
) β default: ββ , The x axis key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_lower_y (
object
) β default: ββ , The y axis key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.watched_key_upper_y (
object
) β default: ββ , The y axis key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==ββ), the column parameter is used instead.quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
lower (
PDict
) β required: False π Provides a PDict object containing the 2D lower bound of the query. The x, y axis keys giving the bound are set by the watched_key_lower_{x|} parameters when they are different from the column_{x|y} parameters.upper (
PDict
) β required: False π Provides a PDict object containing the 2D upper bound of the query. The x, y axis keys giving the bound are set by the watched_key_upper_{x|} parameters when they are different from the column_{x|y} parameters.min (
PDict
) β π The minimum value in the input data. This mandatory parameter could be provided by the create_dependent_modules() method.max (
PDict
) β π The maximum value in the input data. This mandatory parameter could be providedby the create_dependent_modules() method.timestamps_x (
PDict
) β required: False π Gives information about bins changed between 2 run steps on the x axistimestamps_y (
PDict
) β required: False π Gives information about bins changed between 2 run steps on the y axisindex_x (
PTable
) β π BinningIndex module output connected to the x filtering input/column.This mandatory parameter could be provided by the create_dependent_modules() method.index_y (
PTable
) β π BinningIndex module output connected to the y filtering input/column.This mandatory parameter could be provided by the create_dependent_modules() method.
- Output slots:
result (
PTableSelectedView
)min (
PDict
) β required: False, attr_name: _min_table π min docmax (
PDict
) β required: False, attr_name: _max_table
- __init__(approximate=False, **kwds)#
- create_dependent_modules(input_module, input_slot, min_=None, max_=None, min_value=None, max_value=None, **kwds)#
Creates a default configuration containing the necessary underlying modules. Beware, {min,max}_value=None is not the same as {min,max}_value=False. With None, a min module is created and connected. With False, it is not created and not connected.
- Return type:
Just like RangeQuery
, the module RangeQuery2d
is not self-sufficient. In order to provide it with an environment, the create_dependent_modules()
method can be used in the same way:
from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.table.range_query_2d import RangeQuery2d
scheduler = Scheduler()
with scheduler:
random = RandomPTable(2, rows=100000, scheduler=scheduler)
range_qry = RangeQuery2d(column_x="_1", column_y="_2", scheduler=scheduler)
range_qry.create_dependent_modules(random, "result")
sink = Sink(scheduler=scheduler)
sink.input.inp = range_qry.output.result
sink.input.inp = range_qry.output.min
sink.input.inp = range_qry.output.max
Categorical Query Module#
- class progressivis.table.categorical_query.CategoricalQuery#
Selects rows that contain values in a given column which are part of a provided subset It is convenient for categorical data.
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
- Output slots:
result (
PTableSelectedView
)
Like previous query modules CategoricalQuery
is not self-sufficient. Use the create_dependent_modules()
to initialize its environement:
from progressivis.core import Sink, Scheduler
from progressivis.io import CSVLoader
from progressivis.table.categorical_query import CategoricalQuery
from progressivis.table.constant import ConstDict
from progressivis.utils.psdict import PDict
scheduler = Scheduler()
with scheduler:
csv = CSVLoader("path/to/your/data.csv", scheduler=scheduler)
# NB: data.csv contains a categorical column named "category"
query = CategoricalQuery(column="category", scheduler=scheduler)
query.create_dependent_modules(input_module=csv)
ct = ConstDict(PDict({"only": ["A", "C"]}), scheduler=scheduler)
query.input.choice = ct.output.result
sink = Sink(scheduler=scheduler)
sink.input.inp = query.output.result
Indexing Modules#
Histogram Index Module#
- class progressivis.table.hist_index.HistogramIndex#
Compute and maintain an histogram index by dividing the entire range of values of a column into a series of intervals (binning). Each bin contains the set of indices of rows in the underlying interval in order to provide fast access to these rows. Actually HistogramIndex is able to build multiple indices, one for each column provided in the table slot hint.
- Module parameters:
bins (
int64
) β default: 126init_threshold (
int64
) β default: 1quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β acceptsSequence[str]
as hint- Output slots:
result (
PTableSelectedView
)min_out (
PDict
) β required: Falsemax_out (
PDict
) β required: False
- __init__(**kwds)#
- query(column, operator_, limit, approximate=False)#
Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)
- Return type:
- restricted_query(column, operator_, limit, only_locs, approximate=False)#
Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)
- Return type:
- range_query_aslist(column, lower, upper, approximate=False)#
Return the list of rows with values in range [lower, upper[
- range_query(column, lower, upper, approximate=False)#
Return the list of rows with values in range [lower, upper[
- Return type:
Unique Index Module#
Data Grouping/Joining/Aggregation Modules#
Join-by-id Module#
Join Module#
- class progressivis.table.join.Join#
{many|one}-to-one join module
- Parameters:
primary_on β column or list of columns giving the primary key on the primary table
related_on β column or list of columns giving the foreign key on the related table
on β shortcut when primary_on and related_on are identical
how (
Union
[Literal
['inner'
],Literal
['outer'
]]) β {inner|outer} NB: outer provides only outer rows on related tablekwds (
Any
) β argument to pass to the join function
- __init__(*, how='inner', fillna=None, inv_mask=None, **kwds)#
- create_dependent_modules(primary_module, related_module, *, primary_slot='result', related_slot='result', primary_cols=None, related_cols=None, on=None, primary_on=None, related_on=None, suffix='')#
- Parameters:
primary_module (
Module
) β module providing the primary data source (primary key owner)related_module (
Module
) β module providing the related data source (foreign key owner)primary_slot (
str
) ββ¦
related_slot (
str
) ββ¦
primary_cols (
Optional
[List
[str
]]) β primary table (virtual) columns to be included in the output viewrelated_cols (
Optional
[List
[str
]]) β related table columns to be included in the output viewprimary_on (
Union
[str
,List
[str
],None
]) β column or list of columns giving the primary key on the primary tablerelated_on (
Union
[str
,List
[str
],None
]) β column or list of columns giving the foreign key on the related tableon (
Union
[str
,List
[str
],None
]) β shortcut when primary_on and related_on are identicalsuffix (
str
) ββ¦
- Return type:
from progressivis.core import Sink, Scheduler
from progressivis.io import ParquetLoader, CSVLoader
from progressivis.table.join import Join
PARQUET_FILE = "path/to/yellow_tripdata_2015-01.parquet"
CSV_URL = "path/to/taxi+_zone_lookup.csv"
scheduler = Scheduler()
with scheduler:
parquet = ParquetLoader(PARQUET_FILE, scheduler=scheduler)
csv = CSVLoader(CSV_URL, scheduler=scheduler)
join = Join(how="inner", scheduler=scheduler)
join.create_dependent_modules(
related_module=parquet,
primary_module=csv,
related_on=["DOLocationID"],
primary_on=["LocationID"]
)
sink = Sink(scheduler=scheduler)
sink.input.inp = join.output.result
Group-By Module#
Aggregate Module#
Set and Flow Control Operations#
Intersection Module#
- class progressivis.table.intersection.Intersection#
Intersection Module It computes the intersection of indices for all its inputs and provides a view containing rows shared by all input tables or views. β οΈ All inputs are based of the same physical table. The columns of the output table are given by the common physical table
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
BasePTable
) β multiple: True π Many tables or views sharing the same physical table- Output slots:
result (
PTableSelectedView
) β π View on the physical table shared by inputs. Itβs index is the intersection on inputs indices
Merging dictionaries Module#
- class progressivis.table.merge_dict.MergeDict#
Gathers many (dict) outputs in one. Assume there is no clash. Complementary to
Switch
module
Switching Module#
- class progressivis.table.switch.Switch#
Switch the input to
result
orresult_else
output at runtime, based oncondition
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β π Data input- Output slots:
result (
PTableSelectedView
) β π replays the input if thecondition
is satisfiedresult_else (
PTableSelectedView
) β π replays the input if the condition fails
- __init__(condition, **kwds)#
Statistical Modules#
Histograms#
- class progressivis.stats.histogram1d.Histogram1D#
Compute the histogram of a scalar, numerical column in the input table
- Module parameters:
bins (
int64
) β default: 128 , the number of equal-width bins in the range given by the min/max inputsintervaldelta (
float64
) β default: -5 , tolerance threshold for variations in the min/max values at which the bounds are changed. Negative values represent %, positive values are absolutequantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
- Output slots:
result (
PDict
) β π the output table. Its datashape is{ array: var * int32, min: float64, max: float64, time: int64 }
- __init__(column, **kwds)#
- class progressivis.stats.histogram2d.Histogram2D#
Compute the 2D histogram of two scalar, numerical columns in the input table. These two columns are referred to as
x_column
andy_column
here.- Module parameters:
xbins (
int64
) β default: 256 , the number ofbins
(as defined forHistogram1D
) over thex
axisybins (
int64
) β default: 256 , the number ofbins
(as defined forHistogram1D
) over they
axisxdelta (
float64
) β default: -5 , thedelta
threshold (as defined forHistogram1D
) over thex
axisydelta (
float64
) β default: -5 , thedelta
threshold (as defined forHistogram1D
) over they
axishistory (
int64
) β default: 3quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
- Output slots:
result (
PTable
) β π the output table
- __init__(x_column, y_column, with_output=True, **kwds)#
- class progressivis.stats.histogram1d_categorical.Histogram1DCategorical#
Compute the histogram (i.e. the bar chart) of a categorical column in the input table
Max / IdxMax#
- class progressivis.stats.max.Max#
Computes the maximum of the values for every column of an input table.
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β acceptsSequence[str]
as hint π Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed- Output slots:
result (
PDict
) β π maximum values dictionary where every key represents a column
- class progressivis.stats.idxmax.IdxMax#
Computes the indices of the maximum of the values for every column
Min / IdxMin#
- class progressivis.stats.min.Min#
Computes the minimum of the values for every column
- class progressivis.stats.idxmin.IdxMin#
- Module parameters:
history (
int64
) β default: 33 , then number of successive results to be keptquantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β acceptsSequence[str]
as hint π the input table- Output slots:
- __init__(**kwds)#
Random sampling#
- class progressivis.stats.random_table.RandomPTable#
Random table generator module
- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Output slots:
result (
PTable
) β π result table
- __init__(columns, rows=-1, random=<built-in method rand of numpy.random.mtrand.RandomState object>, dtype='float64', throttle=False, **kwds)#
- Parameters:
columns (
Union
[int
,Sequence
[str
]]) βcolumns definition:
if it is an
int
it provides the number (n) of columns named_1
β¦_n
else it provides sequence of column names
rows (
int
) β if positive (= n) stops generation aftern
rows else unboundedrandom (
Callable
[...
,ndarray
[Any
,Any
]]) βnumpy
random function, default isnumpy.random.rand
dtype (
str
) βnumpy
alike data typethrottle (
Union
[int
,integer
[Any
],float
,bool
]) β limit the number of rows to be generated in a stepkwds (
Any
) β extra keyword args to be passed to theModule
superclass
- class progressivis.stats.blobs_table.BlobsPTable#
Isotropic Gaussian blobs table generator based on
sklearn.datasets.make_blobs
function- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Output slots:
- __init__(columns, rows=-1, dtype='float64', seed=0, throttle=False, *, centers, **kwds)#
- Parameters:
columns (
Union
[int
,List
[str
],ndarray
[Any
,Any
]]) βcolumns definition:
if it is an
int
it provides the number (n) of columns named_1
β¦_n
else it provides sequence of column names
rows (
int
) β if positive (= n) stops generation aftern
rows else unboundeddtype (
str
) βnumpy
alike data typeseed (
int
) β used to initialize the random number generatorthrottle (
Union
[int
,bool
,float
]) β limit the number of rows to be generated in a stepcenters (
Any
) β number of centers to generate, or the fixed center locationskwds (
Any
) β : extra keyword args to be passed tosklearn.datasets.make_blobs
- class progressivis.stats.blobs_table.MVBlobsPTable#
The multivariate normal distribution blobs table generator based on
numpy.random.multivariate_normal
function- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Output slots:
- __init__(columns, rows=-1, dtype='float64', seed=0, throttle=False, *, means, covs, **kwds)#
- Parameters:
columns (
Union
[int
,List
[str
],ndarray
[Any
,Any
]]) βcolumns definition:
if it is an
int
it provides the number (n) of columns named_1
β¦_n
else it provides sequence of column names
rows (
int
) β if positive (= n) stops generation aftern
rows else unboundeddtype (
str
) βnumpy
alike data typeseed (
int
) β used to initialize the random number generatorthrottle (
Union
[int
,bool
,float
]) β limit the number of rows to be generated in a stepmeans (
Any
) β sequence of 1-D array_like of length N for N-dimensional blobscovs (
Any
) β sequence of 2-D array_like, of shape (N, N) representing covariance matrix of the distribution for N-dimensional blobskwds (
Any
) β extra keyword args to be passed tonumpy.random.multivariate_normal
Stats#
- class progressivis.stats.stats.Stats#
Computes the minimum and the maximum of the values for a single column
- Module parameters:
history (
int64
) β default: 3 , then number of successive results to be keptquantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β π the input table- Output slots:
result (
PTable
) β π result table containing two columns (for min and max value)
- __init__(column, min_column=None, max_column=None, reset_index=False, **kwds)#
- Parameters:
column (
Union
[str
,int
]) β the name or the position of the column to be processedmin_column (
Optional
[str
]) β the name of the minimum column in theresult
table. When missing, the given name is_<column>_min
man_column β the name of the maximum column in the
result
table. When missing, the given name is_<column>_max
kwds (
Any
) β extra keyword args to be passed to theModule
superclass
Sample#
- class progressivis.stats.sample.Sample#
Reservoir Sampling module
- Module parameters:
samples (
int64
) β default: 50quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
)- Output slots:
result (
PTableSelectedView
)select (
PIntSet
) β required: False, attr_name: pintset, custom_attr: True
Variance#
- class progressivis.stats.var.Var#
Computes the variance for every columns of an input table.
- class progressivis.stats.var.VarH#
Compute the variance of the columns of an input table. This variant keeps history
Clustering Modules#
Linear Algebra Modules#
Element-wise processing modules#
These modules apply numpy universal functions (a.k.a. ufunc) to the column of one or two input tables.
Depending on the applied ufunc arity, we distinguish several categories of modules.
Unary modules#
These modules apply an unary ufunc on all columns or a subset of columns from the input table, for example:
- class progressivis.linalg.Absolute#
Applies
numpy.absolute
over all input columns or over a subset- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β acceptsSequence[str]
as hint π Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed- Output slots:
result (
PTable
) β required: False, datashape: {βtableβ: β#columnsβ} π The output table follows the structure oftable
The other unary modules have the same interface as Absolute
module above. They are:
Module name |
underlying Universal Function |
---|---|
Absolute |
|
Arccos |
|
Arccosh |
|
Arcsin |
|
Arcsinh |
|
Arctan |
|
Arctanh |
|
Cbrt |
|
Ceil |
|
Conjugate |
|
Cos |
|
Cosh |
|
Deg2rad |
|
Degrees |
|
Exp |
|
Exp2 |
|
Expm1 |
|
Fabs |
|
Floor |
|
Frexp |
|
Invert |
|
Isfinite |
|
Isinf |
|
Isnan |
|
Isnat |
|
Log |
|
Log10 |
|
Log1p |
|
Log2 |
|
LogicalNot |
|
Modf |
|
Negative |
|
Positive |
|
Rad2deg |
|
Radians |
|
Reciprocal |
|
Rint |
|
Sign |
|
Signbit |
|
Sin |
|
Sinh |
|
Spacing |
|
Sqrt |
|
Square |
|
Tan |
|
Tanh |
|
Trunc |
Binary modules#
These modules apply a binary ufunc on two sets of columns belonging to same input table (for example ColsAdd
below) or to two distinct tables (for example Add
below):
- class progressivis.linalg.Add#
Applies
numpy.add
over two sets of columns. One of them belongs to thefirst
input table and the other belongs to thesecond
Examples:
from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.linalg import Add
#
# every column in random1 are added to the column in random2 in the same position
#
scheduler = Scheduler()
with scheduler:
random1 = RandomPTable(3, rows=100_000, scheduler=scheduler)
random2 = RandomPTable(3, rows=100_000, scheduler=scheduler)
module = Add(scheduler=scheduler)
module.input.first = random1.output.result
module.input.second = random2.output.result
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result
#
# columns _3, _5, _7 in random1 are added to column _4, _6, _8 in random2
#
scheduler = Scheduler()
with scheduler:
random1 = RandomPTable(3, rows=100_000, scheduler=scheduler)
random2 = RandomPTable(3, rows=100_000, scheduler=scheduler)
module = Add(scheduler=scheduler)
module.input.first = random1.output.result["_3", "_5", "_7"]
module.input.second = random2.output.result["_4", "_6", "_8"]
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result
- class progressivis.linalg.ColsAdd#
Example:
from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.linalg import ColsAdd
#
# columns _3, _5, _7 are added to column _4, _6, _8
#
scheduler = Scheduler()
with scheduler:
random = RandomPTable(3, rows=100_000, scheduler=scheduler)
module = ColsAdd(
cols_out=["x", "y", "z"],
scheduler=scheduler,
)
module.input.table = random.output.result[["_3", "_5", "_7"], ["_4", "_6", "_8"]]
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result
The other binary modules have the same interface as Add
and ColsAdd
modules above. They are:
Module name |
underlying Universal Function |
---|---|
Add / ColsAdd / AddReduce |
|
Arctan2 / ColsArctan2 / Arctan2Reduce |
|
BitwiseAnd / ColsBitwiseAnd / BitwiseAndReduce |
|
BitwiseOr / ColsBitwiseOr / BitwiseOrReduce |
|
BitwiseXor / ColsBitwiseXor / BitwiseXorReduce |
|
Copysign / ColsCopysign / CopysignReduce |
|
Divide / ColsDivide / DivideReduce |
|
Divmod / ColsDivmod / DivmodReduce |
|
Equal / ColsEqual / EqualReduce |
|
FloatPower / ColsFloatPower / FloatPowerReduce |
|
FloorDivide / ColsFloorDivide / FloorDivideReduce |
|
Fmax / ColsFmax / FmaxReduce |
|
Fmin / ColsFmin / FminReduce |
|
Fmod / ColsFmod / FmodReduce |
|
Gcd / ColsGcd / GcdReduce |
|
Greater / ColsGreater / GreaterReduce |
|
GreaterEqual / ColsGreaterEqual / GreaterEqualReduce |
|
Heaviside / ColsHeaviside / HeavisideReduce |
|
Hypot / ColsHypot / HypotReduce |
|
Lcm / ColsLcm / LcmReduce |
|
Ldexp / ColsLdexp / LdexpReduce |
|
LeftShift / ColsLeftShift / LeftShiftReduce |
|
Less / ColsLess / LessReduce |
|
LessEqual / ColsLessEqual / LessEqualReduce |
|
Logaddexp / ColsLogaddexp / LogaddexpReduce |
|
Logaddexp2 / ColsLogaddexp2 / Logaddexp2Reduce |
|
LogicalAnd / ColsLogicalAnd / LogicalAndReduce |
|
LogicalOr / ColsLogicalOr / LogicalOrReduce |
|
LogicalXor / ColsLogicalXor / LogicalXorReduce |
|
Maximum / ColsMaximum / MaximumReduce |
|
Minimum / ColsMinimum / MinimumReduce |
|
Mod / ColsMod / ModReduce |
|
Multiply / ColsMultiply / MultiplyReduce |
|
Nextafter / ColsNextafter / NextafterReduce |
|
NotEqual / ColsNotEqual / NotEqualReduce |
|
Power / ColsPower / PowerReduce |
|
Remainder / ColsRemainder / RemainderReduce |
|
RightShift / ColsRightShift / RightShiftReduce |
|
Subtract / ColsSubtract / SubtractReduce |
|
TrueDivide / ColsTrueDivide / TrueDivideReduce |
Reduce modules#
These modules reduce input table columns dimensions by one, by applying an ufunc, for example:
- class progressivis.linalg.AddReduce#
Applies
numpy.add.reduce
over all input columns or over a subset of them- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
table (
PTable
) β acceptsSequence[str]
as hint π Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed- Output slots:
result (
PDict
) β π The keyβs names follow the input table columns
The other reduce modules have the same interface as AddReduce
modules above. They are listed in the previous table.
Decorators#
One can transform a simple function into an element-wise module via three decorators @unary_module
@binary_module
and @reduce_module
:
Examples:
import numpy as np
from progressivis.linalg import unary_module, binary_module, reduce_module
from typing import cast
@unary_module
def CustomUnary(x: float) -> float:
return cast(float, (x + np.sin(x)) / (x + np.cos(x)))
@binary_module
def CustomBinary(x: float, y: float) -> float:
return cast(float, (x + np.sin(y)) / (x + np.cos(y)))
@reduce_module
def CustomBinaryReduce(x: float, y: float) -> float:
return cast(float, (x + np.sin(y)) / (x + np.cos(y)))
Declarative modules#
One can create new modules in a declarative way as subclasses of linalg.mixufunc.MixUfuncABC
.
Examples:
import numpy as np
from progressivis.linalg.mixufunc import MixUfuncABC
from progressivis.table.table import PTable
from progressivis.core.module import def_input, def_output
from typing import Any
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False, datashape={"first": ["_1", "_2"]})
class MixUfuncSample(MixUfuncABC):
"""
Explanation: the result table has two columns "_1" and "_2" which are calculated
with the underlying expressions
NB: Here, columns in first and second table are supposed to be _1, _2, ...
"""
expr = {"_1": (np.add, "first._2", "second._3"), "_2": (np.log, "second._3")}
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncSample2(MixUfuncABC):
"""
The output types can be coerced if necessary
"""
expr = {
"_1:float64": (np.add, "first._2", "second._3"),
"_2:float64": (np.log, "second._3"),
}
def custom_unary(x: float) -> float:
return (x + np.sin(x)) / (x + np.cos(x)) # type: ignore
custom_unary_ufunc: Any = np.frompyfunc(custom_unary, 1, 1)
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncCustomUnary(MixUfuncABC):
"""
Module using a custom unary function
"""
expr = {
"_1:float64": (np.add, "first._2", "second._3"),
"_2:float64": (custom_unary_ufunc, "second._3"),
}
def custom_binary(x: float, y: float) -> float:
return (x + np.sin(y)) / (x + np.cos(y)) # type: ignore
custom_binary_ufunc: Any = np.frompyfunc(custom_binary, 2, 1)
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncCustomBinary(MixUfuncABC):
"""
Module using a custom unary function
"""
expr = {
"_1:float64": (custom_binary_ufunc, "first._2", "second._3"),
"_2:float64": (np.log, "second._3"),
}
Declarative module based on numexpr
expressions can be created using NumExprABC
this way:
from progressivis.linalg.nexpr import NumExprABC
from progressivis.table.table import PTable
from progressivis.core.module import def_input, def_output
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False, datashape={"first": ["_1", "_2"]})
class NumExprSample(NumExprABC):
"""
NB: Here, columns in first and second table are supposed to be _1, _2, ...
"""
expr = {"_1": "{first._2}+2*{second._3}", "_2": "{first._3}-5*{second._2}"}
@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class NumExprSample2(NumExprABC):
"""
The output types can be coerced if necessary
"""
expr = {
"_1:float64": "{first._2}+2*{second._3}",
"_2:float64": "{first._3}-5*{second._2}",
}
Linear mapping#
Linear transformation can be performed via this module:
- class progressivis.linalg.linear_map.LinearMap#
Performs a linear transformation on rows in
vectors
table. Thetransformation
table (or a subset of its columns) provides the tranformation matrix once all their rows are read. Its rows number has to be equal tovectors
columns number.- Module parameters:
quantum (
float64
) β default: 0.5debug (
bool
) β default: False
- Input slots:
vectors (
PTable
) β acceptsSequence[str]
as hint π Table providing the row vectors.A selection of columnscould be provided via a hint (see example). When no hint is provided all input columns are usertransformation (
PTable
) β acceptsSequence[str]
as hint π Table providing the transformation matrix.A selection of columnscould be provided via a hint (same behaviour asvectors
)
- Output slots:
result (
PTable
) β π Result table, follows the structure oftransformation
Example:
from progressivis.core import Sink, Scheduler
from progressivis.linalg.linear_map import LinearMap
from progressivis.stats import RandomPTable
scheduler = Scheduler()
with scheduler:
vectors = RandomPTable(20, rows=100000, scheduler=scheduler)
transf = RandomPTable(20, rows=3, scheduler=scheduler)
module = LinearMap(scheduler=scheduler)
module.input.vectors = vectors.output.result["_3", "_4", "_5"]
module.input.transformation = transf.output.result["_4", "_5", "_6", "_7"]
sink = Sink(scheduler=scheduler)
sink.input.inp = module.output.result