Module Library#

I/O Modules#

Simple CSV Loader#

class progressivis.io.simple_csv_loader.SimpleCSVLoader#

This module reads comma-separated values (csv) files progressively into a PTable. Optionally, it provides information about missing values and anomalies via anomalies and missing output slots. Internally it uses pandas.read_csv.

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

filenames (PTable) – required: False πŸ“” Files to read. The underlying PTable must have a filename column containing the file URIs. ⚠️ filenames slot and filepath_or_buffer init parameter cannot both be defined!

Output slots:
  • result (PTable) – πŸ“” Provides read data into a PTable object

  • anomalies (PDict) – required: False πŸ“” provides invalid values as: anomalies[id][column] = <invalid-value>

  • missing (PDict) – required: False πŸ“” provides missing values as: missing[column] = <set-of-ids>

__init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, imputer=None, **kwds)#
Parameters:
  • filepath_or_buffer (Optional[Any]) – str, path object or file-like object accepted by pandas.read_csv

  • filter_ (Optional[Callable[[DataFrame], DataFrame]]) –

    filtering function to be applied on input data at loading time

    Example

    >>> def filter_(df):
    ...     lon = df['dropoff_longitude']
    ...     lat = df['dropoff_latitude']
    ...     return df[(lon>-74.10)&(lon<-73.7)&(lat>40.60)&(lat<41)]
    

  • force_valid_ids (bool) – force renaming of columns to make their names valid identifiers according to the language definition

  • fillvalues (Optional[Dict[str, Any]]) – the default values of the columns specified as a dictionary (see PTable)

  • throttle (Union[bool, int, float]) – limit the number of rows to be loaded in a step

  • imputer (Optional[SimpleImputer]) – a SimpleImputer provides basic strategies for imputing missing values

  • kwds (Any) – extra keyword args to be passed to pandas.read_csv and Module superclass

Reliable CSV Loader#

class progressivis.io.csv_loader.CSVLoader#

This module reads comma-separated values (csv) files progressively into a PTable. It is able to resume the reading after a network failure or a crash. ⚠️ this module do not wait for filenames!

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

filenames (PTable) – required: False πŸ“” Files to read. The underlying PTable must have a filename column containing the file URIs. ⚠️ filenames slot and filepath_or_buffer init parameter cannot both be defined!

Output slots:

result (PTable) – πŸ“” Provides read data into a PTable object

__init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, as_array=None, timeout=None, save_context=True, recovery=False, recovery_tag='', recovery_table_size=3, save_step_size=100000, **kwds)#
Parameters:
  • filepath_or_buffer (Optional[Any]) –

    str, path object or file-like object accepted by pandas.read_csv

    NB: filenames slot and filepath_or_buffer init parameter cannot both be defined

  • filter_ (Optional[Callable[[DataFrame], DataFrame]]) –

    filtering function to be applied on input data at loading time

    Example

    >>> def filter_(df):
    ...     lon = df['dropoff_longitude']
    ...     lat = df['dropoff_latitude']
    ...     return df[(lon>-74.10)&(lon<-73.7)&(lat>40.60)&(lat<41)]
    

  • force_valid_ids (bool) – force renaming of columns to make their names valid identifiers according to the language definition

  • fillvalues (Optional[Dict[str, Any]]) – the default values of the columns specified as a dictionary (see PTable)

  • as_array (Union[str, Dict[str, List[str]], Callable[[List[str]], Dict[str, List[str]]], None]) –

    allows lists of n input columns to be grouped into output n-D columns (grouped column types must be identical):

    • when as_array is a Dict[str, List[str]] every entry represents a grouping rule

      Example:
      >>> CSVLoader(
      ...             get_dataset("bigfile"),
      ...             index_col=False,
      ...             as_array={
      ...                 "first_group": ["_1", "_2", "_3"],
      ...                 "second_group": ["_11", "_12", "_13"],
      ...             },
      ...             header=None,
      ...             scheduler=s,
      ...         )
      
    • when as_array is a str it gives the name of an unique n-D output column which groups all input columns

    • when as_array is a Callable it must be able to process the list of all input columns and return a Dict[str, List[str]]

      Example:
      >>> CSVLoader(
      ...                 get_dataset("mnist_784"),
      ...                 index_col=False,
      ...                 as_array=lambda cols: {"array": [
      ...                     c for c in cols if c != "class"]},
      ...                 scheduler=s,
      ...             )
      

  • timeout (Optional[float]) – stop waiting for a response and raise an exception after a given number of seconds

  • save_context (bool) –

    saving information allowing recovery after a failure if True and if filepath_or_buffer

    designates a recoverable support (i.e. a filepath)

    NB: currently backup only works when data input is provided via filepath_or_buffer

  • recovery (bool) – when False (default) the data entry is read from the beginning otherwise the previous reading is resumed using the save information

  • recovery_tag (Union[str, int]) – customize save tags (useful mainly for debugging)

  • recovery_table_size (int) – defines the size of the recovery table (useful mainly for debugging),

  • save_step_size (int) – defines de number of rows to read between two context snapshots,

  • kwds (Any) – extra keyword args to be passed to pandas.read_csv and Module superclass

CSV Loader via PyArrow#

class progressivis.io.pa_csv_loader.PACSVLoader#
This module reads comma-separated values (csv) files progressively into a PTable

using the PyArrow backend.

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

filenames (PTable) – required: False πŸ“” Files to read. The underlying PTable must have a filename column containing the file URIs. ⚠️ filenames slot and filepath_or_buffer init parameter cannot both be defined!

Output slots:
  • anomalies (PDict) – required: False πŸ“” provides: anomalies['skipped_cnt'] = <skipped-rows-cnt> and anomalies['invalid_values'] = {column: <set-of_invalid-values> for column in <columns-subset>}

  • result (PTable) – πŸ“” Provides read data into a PTable object

__init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, read_options=None, parse_options=None, convert_options=None, drop_na=True, **kwds)#
Parameters:
property parser: CSVStreamingReader#

When data contains invalid values pyarrow.csv.open_csv can raise an ArrowInvalid exception (even though no rows were fetched yet …) so one prefers a late instanciation of the CSVStreamingReader

get_progress()#

Return a tuple of numbers (current,total) where current is current progress value and total is the total number of values to process; these values can change during the computations.

Return type:

Tuple[int, int]

recovering()#

transforms invalid values in NA values

Return type:

RecordBatch

Parquet Loader via PyArrow#

class progressivis.io.parquet_loader.ParquetLoader#
This module reads parquet files progressively into a PTable

using the PyArrow backend.

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

filenames (PTable) – required: False πŸ“” Files to read. The underlying PTable must have a filename column containing the file URIs. ⚠️ filenames slot and filepath_or_buffer init parameter cannot both be defined!

Output slots:
  • anomalies (PDict) – required: False πŸ“” provides: anomalies['skipped_cnt'] = <skipped-rows-cnt>

  • result (PTable) – πŸ“” Provides read data into a PTable object

__init__(filepath_or_buffer=None, filter_=None, force_valid_ids=True, fillvalues=None, throttle=False, columns=None, **kwds)#
Parameters:

Constant Module#

class progressivis.table.constant.Constant#

Module providing a constant output PTable slot

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Output slots:

result (PTable)

__init__(table, **kwds)#
Parameters:

table (PTable) – object to be used by the result output slot

class progressivis.table.constant.ConstDict#

Module providing a constant output PDict slot

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Output slots:

result (PDict)

__init__(pdict, **kwds)#
Parameters:
  • pdict (PDict) – object to be used by the result output slot

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Variable Module#

class progressivis.io.variable.Variable#

This module allows external events (comming from widgets, commands, etc.) to be taken into account

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Output slots:

result (PDict) – πŸ“” Returns the dictionary provided by from_input() after translation (if applicable)

__init__(init_val=None, translation=None, **kwds)#
Parameters:
  • init_val (Optional[Dict[str, Any]]) – initial value for the result

  • translation (Optional[Dict[str, Any]]) – sometimes dictionaries provided via from_input() could contain multiple aliases for the same canonical key (e.g. pickup_latitude and dropoff_latitude as aliases for latitude) . In order to fix that, translation should provide an alias-canonical key mapping dictionary (e.g. {'pickup_latitude': 'latitude', 'dropoff_latitude': 'latitude'})

  • kwds (Any) – extra keyword args to be passed to Module superclass

async from_input(input_)#

Catch and process a message from an interaction

Return type:

str

Data Filtering Modules#

Simple Filtering Module#

class progressivis.table.simple_filter.SimpleFilter#

Filtering module based on a simple condition of the form

<column> <operator> <value> where

<operator> := '>' | '>=' | '<' | '<='

Module parameters:
  • column (object) – default: unknown , filtering column (condition’s left operand)

  • op (object) – default: > , relational operator (i.e. β€˜>’, β€˜>=’, β€˜<’, β€˜<=’)

  • value_key (object) – default: β€œβ€ , see value input below

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • table (PTable) – πŸ“” input table or view

  • value (PDict) – πŸ“” contains the condition’s right operand.if value_key is provided the right operand is value['value_key'] else the first value in the dict is used

  • hist (PTable) – πŸ“” BinningIndex module output connected to the same input/column.This mandatory parameter could be provided by the create_dependent_modules() method.

Output slots:

result (PTableSelectedView)

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

create_dependent_modules(input_module, input_slot, hist_index=None, **kwds)#

Creates a default configuration.

Parameters:
  • input_module (Module) – the input module (see the example)

  • input_slot (str) – the input slot name (e.g. result)

  • hist_index (Optional[BinningIndex]) – optional histogram index. if not provided an BinningIndex is created

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Return type:

SimpleFilter

CMP Query Module (via NumExpr)#

class progressivis.table.cmp_query.CmpQueryLast#

Filtering module applying on rows of table a constraint based on all values of the last row of cmp of the form :

(table[:, col1] <op> last[col1]) <combine> (table[:, col2] <op> last[col2]) ... (table[:, colN] <op> last[colN])

where:

last is the last row of the cmp table

and

<op> := '>' | '>=' | '<' | '<=' | '==' | '!='

and

<combine> := 'and' | 'or' | 'xor'

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • table (PTable) – πŸ“” Data input to be filtered

  • cmp (PTable) – πŸ“” The last row of this table provides the values used by the filter

Output slots:

select (PIntSet) – required: False πŸ“” Selected indices

__init__(op='<', combine='and', **kwds)#
Parameters:
  • op (str) – relational operator

  • combine (str) – logical connector

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Numerical Expression Filtering Module (via NumExpr)#

class progressivis.table.filtermod.FilterMod#

Filtering module based on numexpr library.

Module parameters:
  • expr (object) – default: unknown , a numexpr alike filtering expression

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – πŸ“” Data input

Output slots:

result (PTableSelectedView) – πŸ“” Returns a filterd view

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

Example:

from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.table.filtermod import FilterMod

scheduler = Scheduler()
with scheduler:
    random = RandomPTable(2, rows=100000, scheduler=scheduler)
    filter_ = FilterMod(scheduler=scheduler)
    filter_.params.expr = "_1 > 0.5"
    filter_.input.table = random.output.result
    sink = Sink(scheduler=scheduler)
    sink.input.inp = filter_.output.result

The underlying graph:

digraph progressivis { ranksep=1;node [shape=none,style="filled",fillcolor="#ffffde",color="#aaaa33",fontname=Helvetica,fontsize=10]; random_p_table_1[shape=Mrecord,label="{{}|random_p_table_1[RandomPTable]|{<o_result> result}}"]; filter_mod_1[shape=Mrecord,label="{{<i_table> table}|filter_mod_1[FilterMod]|{<o_result> result}}"]; sink_1[shape=Mrecord,label="{{<i_inp> inp}|sink_1[Sink]|{}}"]; random_p_table_1:o_result:s->filter_mod_1:i_table:n; filter_mod_1:o_result:s->sink_1:i_inp:n; }

The Range Query Module#

class progressivis.table.range_query.RangeQuery#

Selects rows that contain values within a provided range along a given axis (column)

Module parameters:
  • column (object) – default: unknown , The column in the table input slot concerned by the query. This parameter is mandatory

  • watched_key_lower (object) – default: β€œβ€ , The key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==””), the column parameter is used instead.

  • watched_key_upper (object) – default: β€œβ€ , The key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==””), the column parameter is used instead.

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • lower (PDict) – πŸ“” Provides a PDict object containing the lower bound of the query. The key giving the bound is set by the watched_key_lower parameter when it is different from the column parameter.

  • upper (PDict) – πŸ“” Provides a PDict object containing the upper bound of the query. The key giving the bound is set by the watched_key_upper parameter when it is different from the column parameter.

  • min (PDict) – πŸ“” The minimum value in the input data. This mandatory parameter could be provided by the create_dependent_modules() method.

  • max (PDict) – πŸ“” The maximum value in the input data. This mandatory parameter could be providedby the create_dependent_modules() method.

  • timestamps (PDict) – required: False πŸ“” Gives information about bins changed between 2 run steps

  • index (PTable) – πŸ“” BinningIndex module output connected to the same input/column.This mandatory parameter could be provided by the create_dependent_modules() method.

Output slots:
  • result (PTableSelectedView) – πŸ“” Query main result

  • min (PDict) – required: False, attr_name: _min_table πŸ“” min value of output data

  • max (PDict) – required: False, attr_name: _max_table πŸ“” max value of output data

__init__(approximate=False, **kwds)#
create_dependent_modules(input_module, input_slot, min_=None, max_=None, min_value=None, max_value=None, hist_index=None, **kwds)#

Creates a default configuration containing the necessary underlying modules. Beware, {min,max}_value=None is not the same as {min,max}_value=False. With None, a min module is created and connected. With False, it is not created and not connected.

Return type:

RangeQuery

Module RangeQuery is not self-sufficient. It needs other modules to work. A simple way to provide it with an environment that allows it to work properly is to use the create_dependent_modules() method.

This convenience method creates a set of modules connected to RangeQuery that produce the inputs required for its operation in most cases.

In the example below it is called with the default values:

from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.table.range_query import RangeQuery

scheduler = Scheduler()
with scheduler:
    random = RandomPTable(2, rows=100000, scheduler=scheduler)
    range_qry = RangeQuery(column="_1", scheduler=scheduler)
    range_qry.create_dependent_modules(random, "result")
    sink = Sink(scheduler=scheduler)
    sink.input.inp = range_qry.output.result
    sink.input.inp = range_qry.output.min
    sink.input.inp = range_qry.output.max

And in this case it produces the following topology:

digraph progressivis { ranksep=1;node [shape=none,style="filled",fillcolor="#ffffde",color="#aaaa33",fontname=Helvetica,fontsize=10]; random_p_table_1[shape=Mrecord,label="{{}|random_p_table_1[RandomPTable]|{<o_result> result}}"]; range_query_1[shape=Mrecord,label="{{<i_lower> lower|<i_upper> upper|<i_min> min|<i_max> max|<i_timestamps> timestamps|<i_index> index}|range_query_1[RangeQuery]|{<o_result> result|<o_min> min|<o_max> max}}"]; binning_index_1[shape=Mrecord,label="{{<i_table> table}|binning_index_1[BinningIndex]|{<o_result> result|<o_bin_timestamps> bin_timestamps|<o_min_out> min_out|<o_max_out> max_out}}"]; variable_1[shape=Mrecord,label="{{}|variable_1[Variable]|{<o_result> result}}"]; variable_2[shape=Mrecord,label="{{}|variable_2[Variable]|{<o_result> result}}"]; sink_1[shape=Mrecord,label="{{<i_inp> inp}|sink_1[Sink]|{}}"]; random_p_table_1:o_result:s->binning_index_1:i_table:n; range_query_1:o_result:s->sink_1:i_inp:n; range_query_1:o_min:s->sink_1:i_inp:n; range_query_1:o_max:s->sink_1:i_inp:n; binning_index_1:o_result:s->range_query_1:i_index:n; binning_index_1:o_bin_timestamps:s->range_query_1:i_timestamps:n; binning_index_1:o_min_out:s->range_query_1:i_min:n; binning_index_1:o_max_out:s->range_query_1:i_max:n; variable_1:o_result:s->range_query_1:i_lower:n; variable_2:o_result:s->range_query_1:i_upper:n; }

The Range Query 2D Module#

class progressivis.table.range_query_2d.RangeQuery2d#

Selects rows that contain values within a provided 2D interval (two columns)

Module parameters:
  • column_x (object) – default: unknown , The x axis column in the table input slot concerned by the query. This parameter is mandatory

  • column_y (object) – default: unknown , The y axis column in the table input slot concerned by the query. This parameter is mandatory

  • watched_key_lower_x (object) – default: β€œβ€ , The x axis key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==””), the column parameter is used instead.

  • watched_key_upper_x (object) – default: β€œβ€ , The x axis key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==””), the column parameter is used instead.

  • watched_key_lower_y (object) – default: β€œβ€ , The y axis key in the lower input slot (which is a PDict) giving the lower bound of the query. When unset (i.e. ==””), the column parameter is used instead.

  • watched_key_upper_y (object) – default: β€œβ€ , The y axis key in the upper input slot (which is a PDict) giving the upper bound of the query. When unset (i.e. ==””), the column parameter is used instead.

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • lower (PDict) – required: False πŸ“” Provides a PDict object containing the 2D lower bound of the query. The x, y axis keys giving the bound are set by the watched_key_lower_{x|} parameters when they are different from the column_{x|y} parameters.

  • upper (PDict) – required: False πŸ“” Provides a PDict object containing the 2D upper bound of the query. The x, y axis keys giving the bound are set by the watched_key_upper_{x|} parameters when they are different from the column_{x|y} parameters.

  • min (PDict) – πŸ“” The minimum value in the input data. This mandatory parameter could be provided by the create_dependent_modules() method.

  • max (PDict) – πŸ“” The maximum value in the input data. This mandatory parameter could be providedby the create_dependent_modules() method.

  • timestamps_x (PDict) – required: False πŸ“” Gives information about bins changed between 2 run steps on the x axis

  • timestamps_y (PDict) – required: False πŸ“” Gives information about bins changed between 2 run steps on the y axis

  • index_x (PTable) – πŸ“” BinningIndex module output connected to the x filtering input/column.This mandatory parameter could be provided by the create_dependent_modules() method.

  • index_y (PTable) – πŸ“” BinningIndex module output connected to the y filtering input/column.This mandatory parameter could be provided by the create_dependent_modules() method.

Output slots:
  • result (PTableSelectedView)

  • min (PDict) – required: False, attr_name: _min_table πŸ“” min doc

  • max (PDict) – required: False, attr_name: _max_table

__init__(approximate=False, **kwds)#
Parameters:
  • approximate (bool) – approx …

  • kwds (Any) – keywords

create_dependent_modules(input_module, input_slot, min_=None, max_=None, min_value=None, max_value=None, **kwds)#

Creates a default configuration containing the necessary underlying modules. Beware, {min,max}_value=None is not the same as {min,max}_value=False. With None, a min module is created and connected. With False, it is not created and not connected.

Return type:

RangeQuery2d

Just like RangeQuery, the module RangeQuery2d is not self-sufficient. In order to provide it with an environment, the create_dependent_modules() method can be used in the same way:

from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.table.range_query_2d import RangeQuery2d

scheduler = Scheduler()
with scheduler:
    random = RandomPTable(2, rows=100000, scheduler=scheduler)
    range_qry = RangeQuery2d(column_x="_1", column_y="_2", scheduler=scheduler)
    range_qry.create_dependent_modules(random, "result")
    sink = Sink(scheduler=scheduler)
    sink.input.inp = range_qry.output.result
    sink.input.inp = range_qry.output.min
    sink.input.inp = range_qry.output.max

Categorical Query Module#

class progressivis.table.categorical_query.CategoricalQuery#

Selects rows that contain values in a given column which are part of a provided subset It is convenient for categorical data.

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • table (PTable) – πŸ“” input data

  • choice (PDict) – πŸ“” provide the subset of categorical values to be queried

Output slots:

result (PTableSelectedView)

__init__(column, choice_key='only', **kwds)#
Parameters:
  • column (str) – filtering column

  • choice_key (str) – the key in the choice input giving the subset to query

Like previous query modules CategoricalQuery is not self-sufficient. Use the create_dependent_modules() to initialize its environement:

from progressivis.core import Sink, Scheduler
from progressivis.io import CSVLoader
from progressivis.table.categorical_query import CategoricalQuery
from progressivis.table.constant import ConstDict
from progressivis.utils.psdict import PDict

scheduler = Scheduler()
with scheduler:
    csv = CSVLoader("path/to/your/data.csv", scheduler=scheduler)
    # NB: data.csv contains a categorical column named "category"
    query = CategoricalQuery(column="category", scheduler=scheduler)
    query.create_dependent_modules(input_module=csv)
    ct = ConstDict(PDict({"only": ["A", "C"]}), scheduler=scheduler)
    query.input.choice = ct.output.result
    sink = Sink(scheduler=scheduler)
    sink.input.inp = query.output.result

digraph progressivis { ranksep=1;node [shape=none,style="filled",fillcolor="#ffffde",color="#aaaa33",fontname=Helvetica,fontsize=10]; csv_loader_1[shape=Mrecord,label="{{}|csv_loader_1[CSVLoader]|{<o_result> result}}"]; categorical_query_1[shape=Mrecord,label="{{<i_table> table|<i_choice> choice}|categorical_query_1[CategoricalQuery]|{<o_result> result}}"]; group_by_1[shape=Mrecord,label="{{<i_table> table}|group_by_1[GroupBy]|{<o_result> result}}"]; const_dict_1[shape=Mrecord,label="{{}|const_dict_1[ConstDict]|{<o_result> result}}"]; sink_1[shape=Mrecord,label="{{<i_inp> inp}|sink_1[Sink]|{}}"]; csv_loader_1:o_result:s->group_by_1:i_table:n; categorical_query_1:o_result:s->sink_1:i_inp:n; group_by_1:o_result:s->categorical_query_1:i_table:n; const_dict_1:o_result:s->categorical_query_1:i_choice:n; }

Indexing Modules#

Histogram Index Module#

class progressivis.table.hist_index.HistogramIndex#

Compute and maintain an histogram index by dividing the entire range of values of a column into a series of intervals (binning). Each bin contains the set of indices of rows in the underlying interval in order to provide fast access to these rows. Actually HistogramIndex is able to build multiple indices, one for each column provided in the table slot hint.

Module parameters:
  • bins (int64) – default: 126

  • init_threshold (int64) – default: 1

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint

Output slots:
__init__(**kwds)#
query(column, operator_, limit, approximate=False)#

Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)

Return type:

PIntSet

restricted_query(column, operator_, limit, only_locs, approximate=False)#

Return the list of rows matching the query. For example, returning all values less than 10 (< 10) would be query(operator.__lt__, 10)

Return type:

PIntSet

range_query_aslist(column, lower, upper, approximate=False)#

Return the list of rows with values in range [lower, upper[

Return type:

List[PIntSet]

range_query(column, lower, upper, approximate=False)#

Return the list of rows with values in range [lower, upper[

Return type:

PIntSet

restricted_range_query(column, lower, upper, only_locs, approximate=False)#

Return the list of rows with values in range [lower, upper[ among only_locs

Return type:

PIntSet

Unique Index Module#

class progressivis.table.unique_index.UniqueIndex#
__init__(on, **kwds)#

Data Grouping/Joining/Aggregation Modules#

Join-by-id Module#

class progressivis.table.join_by_id.JoinById#

Module executing join.

__init__(**kwds)#

Join(on=None, how=’left’, lsuffix=’’, rsuffix=’’, sort=False,name=None)

Join Module#

class progressivis.table.join.Join#

{many|one}-to-one join module

Parameters:
  • primary_on – column or list of columns giving the primary key on the primary table

  • related_on – column or list of columns giving the foreign key on the related table

  • on – shortcut when primary_on and related_on are identical

  • how (Union[Literal['inner'], Literal['outer']]) – {inner|outer} NB: outer provides only outer rows on related table

  • kwds (Any) – argument to pass to the join function

__init__(*, how='inner', fillna=None, inv_mask=None, **kwds)#
create_dependent_modules(primary_module, related_module, *, primary_slot='result', related_slot='result', primary_cols=None, related_cols=None, on=None, primary_on=None, related_on=None, suffix='')#
Parameters:
  • primary_module (Module) – module providing the primary data source (primary key owner)

  • related_module (Module) – module providing the related data source (foreign key owner)

  • primary_slot (str) –

    …

  • related_slot (str) –

    …

  • primary_cols (Optional[List[str]]) – primary table (virtual) columns to be included in the output view

  • related_cols (Optional[List[str]]) – related table columns to be included in the output view

  • primary_on (Union[str, List[str], None]) – column or list of columns giving the primary key on the primary table

  • related_on (Union[str, List[str], None]) – column or list of columns giving the foreign key on the related table

  • on (Union[str, List[str], None]) – shortcut when primary_on and related_on are identical

  • suffix (str) –

    …

Return type:

None

from progressivis.core import Sink, Scheduler
from progressivis.io import ParquetLoader, CSVLoader
from progressivis.table.join import Join

PARQUET_FILE = "path/to/yellow_tripdata_2015-01.parquet"
CSV_URL = "path/to/taxi+_zone_lookup.csv"

scheduler = Scheduler()
with scheduler:
    parquet = ParquetLoader(PARQUET_FILE, scheduler=scheduler)
    csv = CSVLoader(CSV_URL, scheduler=scheduler)
    join = Join(how="inner", scheduler=scheduler)
    join.create_dependent_modules(
        related_module=parquet,
        primary_module=csv,
        related_on=["DOLocationID"],
        primary_on=["LocationID"]
    )
    sink = Sink(scheduler=scheduler)
    sink.input.inp = join.output.result

digraph progressivis { ranksep=1;node [shape=none,style="filled",fillcolor="#ffffde",color="#aaaa33",fontname=Helvetica,fontsize=10]; parquet_loader_1[shape=Mrecord,label="{{}|parquet_loader_1[ParquetLoader]|{<o_anomalies> anomalies|<o_result> result}}"]; csv_loader_1[shape=Mrecord,label="{{}|csv_loader_1[CSVLoader]|{<o_result> result}}"]; join_1[shape=Mrecord,label="{{<i_primary> primary|<i_related> related}|join_1[Join]|{<o_result> result|<o_primary_outer> primary_outer}}"]; group_by_1[shape=Mrecord,label="{{<i_table> table}|group_by_1[GroupBy]|{<o_result> result}}"]; unique_index_1[shape=Mrecord,label="{{<i_table> table}|unique_index_1[UniqueIndex]|{<o_result> result}}"]; sink_1[shape=Mrecord,label="{{<i_inp> inp}|sink_1[Sink]|{}}"]; parquet_loader_1:o_result:s->group_by_1:i_table:n; csv_loader_1:o_result:s->unique_index_1:i_table:n; join_1:o_result:s->sink_1:i_inp:n; group_by_1:o_result:s->join_1:i_related:n; unique_index_1:o_result:s->join_1:i_primary:n; }

Group-By Module#

class progressivis.table.group_by.GroupBy#
__init__(by, keepdims=False, **kwds)#

Aggregate Module#

class progressivis.table.aggregate.Aggregate#
__init__(compute, **kwds)#

Set and Flow Control Operations#

Intersection Module#

class progressivis.table.intersection.Intersection#

Intersection Module It computes the intersection of indices for all its inputs and provides a view containing rows shared by all input tables or views. ⚠️ All inputs are based of the same physical table. The columns of the output table are given by the common physical table

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (BasePTable) – multiple: True πŸ“” Many tables or views sharing the same physical table

Output slots:

result (PTableSelectedView) – πŸ“” View on the physical table shared by inputs. It’s index is the intersection on inputs indices

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

Merging dictionaries Module#

class progressivis.table.merge_dict.MergeDict#

Gathers many (dict) outputs in one. Assume there is no clash. Complementary to Switch module

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PDict) – multiple: True πŸ“” Multiple inputs providing dictionaries

Output slots:

result (PDict) – πŸ“” Merged dictionary

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

Switching Module#

class progressivis.table.switch.Switch#

Switch the input to result or result_else output at runtime, based on condition

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – πŸ“” Data input

Output slots:
  • result (PTableSelectedView) – πŸ“” replays the input if the condition is satisfied

  • result_else (PTableSelectedView) – πŸ“” replays the input if the condition fails

__init__(condition, **kwds)#
Parameters:
  • condition (Callable[..., Optional[bool]]) –

    callable which should return:

    • None => undecidable (yet), run_step must return blocked_state

    • True => run_step output is result

    • False => run_step output is result_else

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Statistical Modules#

Histograms#

class progressivis.stats.histogram1d.Histogram1D#

Compute the histogram of a scalar, numerical column in the input table

Module parameters:
  • bins (int64) – default: 128 , the number of equal-width bins in the range given by the min/max inputsinterval

  • delta (float64) – default: -5 , tolerance threshold for variations in the min/max values at which the bounds are changed. Negative values represent %, positive values are absolute

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • table (PTable) – πŸ“” the input table

  • min (PDict) – πŸ“” The minimum value in the input data. It could be provided by a Min module

  • max (PDict) – πŸ“” The maximum value in the input data. It could be provided by a Max module

Output slots:

result (PDict) – πŸ“” the output table. Its datashape is { array: var * int32, min: float64, max: float64, time: int64 }

__init__(column, **kwds)#
Parameters:
  • column (Union[int, str]) – the name or the position of the column to be processed

  • kwds (Any) – extra keyword args to be passed to the Module superclass

to_json(short=False, with_speed=True)#

Return a dictionary describing the module

Return type:

dict[str, Any]

class progressivis.stats.histogram2d.Histogram2D#

Compute the 2D histogram of two scalar, numerical columns in the input table. These two columns are referred to as x_column and y_column here.

Module parameters:
  • xbins (int64) – default: 256 , the number of bins (as defined for Histogram1D) over the x axis

  • ybins (int64) – default: 256 , the number of bins (as defined for Histogram1D) over the y axis

  • xdelta (float64) – default: -5 , the delta threshold (as defined for Histogram1D) over the x axis

  • ydelta (float64) – default: -5 , the delta threshold (as defined for Histogram1D) over the y axis

  • history (int64) – default: 3

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • table (PTable) – πŸ“” the input table

  • min (PDict) – πŸ“” The minimum value in the input data. It could be provided by a Min module

  • max (PDict) – πŸ“” The maximum value in the input data. It could be provided by a Max module

Output slots:

result (PTable) – πŸ“” the output table

__init__(x_column, y_column, with_output=True, **kwds)#
Parameters:
  • x_column (Union[int, str]) – the name or the position of the x axis column to be processed

  • y_column (Union[int, str]) – the name or the position of the y axis column to be processed

  • kwds (Any) – extra keyword args to be passed to the Module superclass

to_json(short=False, with_speed=True)#

Return a dictionary describing the module

Return type:

dict[str, Any]

class progressivis.stats.histogram1d_categorical.Histogram1DCategorical#

Compute the histogram (i.e. the bar chart) of a categorical column in the input table

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – πŸ“” the input table

Output slots:

result (PDict) – πŸ“” occurence counters dictionary where every key represents a categorical value

__init__(column, **kwds)#
Parameters:
  • column (Union[str, int]) – the name or the position of the column to be processed

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Max / IdxMax#

class progressivis.stats.max.Max#

Computes the maximum of the values for every column of an input table.

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed

Output slots:

result (PDict) – πŸ“” maximum values dictionary where every key represents a column

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

class progressivis.stats.idxmax.IdxMax#

Computes the indices of the maximum of the values for every column

Module parameters:
  • history (int64) – default: 3 , then number of successive results to be kept

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” the input table

Output slots:
  • max (PTable) – required: False, attr_name: _max πŸ“” maximum values output table

  • result (PTable) – πŸ“” indices in the input table of the maximum values

__init__(**kwds)#
Parameters:
  • columns – columns to be processed. When missing all input columns are processed

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Min / IdxMin#

class progressivis.stats.min.Min#

Computes the minimum of the values for every column

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” the input table

Output slots:

result (PDict) – πŸ“” minimum values dictionary where every key represents a column

__init__(**kwds)#
Parameters:
  • columns – columns to be processed. When missing all input columns are processed

  • kwds (Any) – extra keyword args to be passed to the Module superclass

class progressivis.stats.idxmin.IdxMin#
Module parameters:
  • history (int64) – default: 33 , then number of successive results to be kept

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” the input table

Output slots:
  • min (PTable) – required: False, attr_name: _min πŸ“” minimum values output table

  • result (PTable) – πŸ“” indices in the input table of the minimum values

__init__(**kwds)#

Random sampling#

class progressivis.stats.random_table.RandomPTable#

Random table generator module

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Output slots:

result (PTable) – πŸ“” result table

__init__(columns, rows=-1, random=<built-in method rand of numpy.random.mtrand.RandomState object>, dtype='float64', throttle=False, **kwds)#
Parameters:
  • columns (Union[int, Sequence[str]]) –

    columns definition:

    • if it is an int it provides the number (n) of columns named _1 … _n

    • else it provides sequence of column names

  • rows (int) – if positive (= n) stops generation after n rows else unbounded

  • random (Callable[..., ndarray[Any, Any]]) – numpy random function, default is numpy.random.rand

  • dtype (str) – numpy alike data type

  • throttle (Union[int, integer[Any], float, bool]) – limit the number of rows to be generated in a step

  • kwds (Any) – extra keyword args to be passed to the Module superclass

class progressivis.stats.blobs_table.BlobsPTable#

Isotropic Gaussian blobs table generator based on sklearn.datasets.make_blobs function

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Output slots:
__init__(columns, rows=-1, dtype='float64', seed=0, throttle=False, *, centers, **kwds)#
Parameters:
  • columns (Union[int, List[str], ndarray[Any, Any]]) –

    columns definition:

    • if it is an int it provides the number (n) of columns named _1 … _n

    • else it provides sequence of column names

  • rows (int) – if positive (= n) stops generation after n rows else unbounded

  • dtype (str) – numpy alike data type

  • seed (int) – used to initialize the random number generator

  • throttle (Union[int, bool, float]) – limit the number of rows to be generated in a step

  • centers (Any) – number of centers to generate, or the fixed center locations

  • kwds (Any) – : extra keyword args to be passed to sklearn.datasets.make_blobs

class progressivis.stats.blobs_table.MVBlobsPTable#

The multivariate normal distribution blobs table generator based on numpy.random.multivariate_normal function

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Output slots:
__init__(columns, rows=-1, dtype='float64', seed=0, throttle=False, *, means, covs, **kwds)#
Parameters:
  • columns (Union[int, List[str], ndarray[Any, Any]]) –

    columns definition:

    • if it is an int it provides the number (n) of columns named _1 … _n

    • else it provides sequence of column names

  • rows (int) – if positive (= n) stops generation after n rows else unbounded

  • dtype (str) – numpy alike data type

  • seed (int) – used to initialize the random number generator

  • throttle (Union[int, bool, float]) – limit the number of rows to be generated in a step

  • means (Any) – sequence of 1-D array_like of length N for N-dimensional blobs

  • covs (Any) – sequence of 2-D array_like, of shape (N, N) representing covariance matrix of the distribution for N-dimensional blobs

  • kwds (Any) – extra keyword args to be passed to numpy.random.multivariate_normal

Stats#

class progressivis.stats.stats.Stats#

Computes the minimum and the maximum of the values for a single column

Module parameters:
  • history (int64) – default: 3 , then number of successive results to be kept

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – πŸ“” the input table

Output slots:

result (PTable) – πŸ“” result table containing two columns (for min and max value)

__init__(column, min_column=None, max_column=None, reset_index=False, **kwds)#
Parameters:
  • column (Union[str, int]) – the name or the position of the column to be processed

  • min_column (Optional[str]) – the name of the minimum column in the result table. When missing, the given name is _<column>_min

  • man_column – the name of the maximum column in the result table. When missing, the given name is _<column>_max

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Sample#

class progressivis.stats.sample.Sample#

Reservoir Sampling module

Module parameters:
  • samples (int64) – default: 50

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable)

Output slots:
__init__(required='result', **kwds)#
Parameters:

required (str) – {"result"|"select"} when required == select the select output is mandatory

Variance#

class progressivis.stats.var.Var#

Computes the variance for every columns of an input table.

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” the input table

Output slots:

result (PDict) – πŸ“” variances dictionary where every key represents a column

__init__(ignore_string_cols=False, **kwds)#
Parameters:
  • ignore_string_cols (bool) – silently ignore str columns if True (else raise an exception)

  • kwds (Any) – extra keyword args to be passed to the Module superclass

class progressivis.stats.var.VarH#

Compute the variance of the columns of an input table. This variant keeps history

Module parameters:
  • history (int64) – default: 3 , then number of successive results to be kept

  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” the input table

Output slots:

result (PTable) – πŸ“” result table

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

Clustering Modules#

Linear Algebra Modules#

Element-wise processing modules#

These modules apply numpy universal functions (a.k.a. ufunc) to the column of one or two input tables.

Depending on the applied ufunc arity, we distinguish several categories of modules.

Unary modules#

These modules apply an unary ufunc on all columns or a subset of columns from the input table, for example:

class progressivis.linalg.Absolute#

Applies numpy.absolute over all input columns or over a subset

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed

Output slots:

result (PTable) – required: False, datashape: {β€˜table’: β€˜#columns’} πŸ“” The output table follows the structure of table

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

The other unary modules have the same interface as Absolute module above. They are:

Unary modules#

Module name

underlying Universal Function

Absolute

absolute

Arccos

arccos

Arccosh

arccosh

Arcsin

arcsin

Arcsinh

arcsinh

Arctan

arctan

Arctanh

arctanh

Cbrt

cbrt

Ceil

ceil

Conjugate

conjugate

Cos

cos

Cosh

cosh

Deg2rad

deg2rad

Degrees

degrees

Exp

exp

Exp2

exp2

Expm1

expm1

Fabs

fabs

Floor

floor

Frexp

frexp

Invert

invert

Isfinite

isfinite

Isinf

isinf

Isnan

isnan

Isnat

isnat

Log

log

Log10

log10

Log1p

log1p

Log2

log2

LogicalNot

logical_not

Modf

modf

Negative

negative

Positive

positive

Rad2deg

rad2deg

Radians

radians

Reciprocal

reciprocal

Rint

rint

Sign

sign

Signbit

signbit

Sin

sin

Sinh

sinh

Spacing

spacing

Sqrt

sqrt

Square

square

Tan

tan

Tanh

tanh

Trunc

trunc

Binary modules#

These modules apply a binary ufunc on two sets of columns belonging to same input table (for example ColsAdd below) or to two distinct tables (for example Add below):

class progressivis.linalg.Add#

Applies numpy.add over two sets of columns. One of them belongs to the first input table and the other belongs to the second

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • first (PTable) – accepts Sequence[str] as hint πŸ“” Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed

  • second (PTable) – accepts Sequence[str] as hint πŸ“” Similar to first

Output slots:

result (PTable) – required: False, datashape: {β€˜first’: β€˜#columns’} πŸ“” The output table follows the structure of first

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

Examples:

from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.linalg import Add

#
# every column in random1 are added to the column in random2 in the same position
#

scheduler = Scheduler()
with scheduler:
    random1 = RandomPTable(3, rows=100_000, scheduler=scheduler)
    random2 = RandomPTable(3, rows=100_000, scheduler=scheduler)
    module = Add(scheduler=scheduler)
    module.input.first = random1.output.result
    module.input.second = random2.output.result
    sink = Sink(scheduler=scheduler)
    sink.input.inp = module.output.result

#
# columns _3, _5, _7 in random1 are added to column _4, _6, _8 in random2
#

scheduler = Scheduler()
with scheduler:
    random1 = RandomPTable(3, rows=100_000, scheduler=scheduler)
    random2 = RandomPTable(3, rows=100_000, scheduler=scheduler)
    module = Add(scheduler=scheduler)
    module.input.first = random1.output.result["_3", "_5", "_7"]
    module.input.second = random2.output.result["_4", "_6", "_8"]
    sink = Sink(scheduler=scheduler)
    sink.input.inp = module.output.result
class progressivis.linalg.ColsAdd#
Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Tuple[Sequence[str], Sequence[str]] as hint πŸ“” The two items of the hint are lists of comumns used to select the operands

Output slots:

result (PTable) – required: False

__init__(cols_out=None, **kwds)#
Parameters:
  • cols_out (Optional[List[str]]) – denotes the names of columns in the result table. If not provided the column selection of the first operand is used

  • kwds (Any) – extra keyword args to be passed to the Module superclass

Example:

from progressivis.core import Sink, Scheduler
from progressivis.stats import RandomPTable
from progressivis.linalg import ColsAdd

#
# columns _3, _5, _7 are added to column _4, _6, _8
#

scheduler = Scheduler()
with scheduler:
    random = RandomPTable(3, rows=100_000, scheduler=scheduler)
    module = ColsAdd(
        cols_out=["x", "y", "z"],
        scheduler=scheduler,
    )
    module.input.table = random.output.result[["_3", "_5", "_7"], ["_4", "_6", "_8"]]
    sink = Sink(scheduler=scheduler)
    sink.input.inp = module.output.result

The other binary modules have the same interface as Add and ColsAdd modules above. They are:

Binary and reduce modules#

Module name

underlying Universal Function

Add / ColsAdd / AddReduce

add

Arctan2 / ColsArctan2 / Arctan2Reduce

arctan2

BitwiseAnd / ColsBitwiseAnd / BitwiseAndReduce

bitwise_and

BitwiseOr / ColsBitwiseOr / BitwiseOrReduce

bitwise_or

BitwiseXor / ColsBitwiseXor / BitwiseXorReduce

bitwise_xor

Copysign / ColsCopysign / CopysignReduce

copysign

Divide / ColsDivide / DivideReduce

divide

Divmod / ColsDivmod / DivmodReduce

divmod

Equal / ColsEqual / EqualReduce

equal

FloatPower / ColsFloatPower / FloatPowerReduce

float_power

FloorDivide / ColsFloorDivide / FloorDivideReduce

floor_divide

Fmax / ColsFmax / FmaxReduce

fmax

Fmin / ColsFmin / FminReduce

fmin

Fmod / ColsFmod / FmodReduce

fmod

Gcd / ColsGcd / GcdReduce

gcd

Greater / ColsGreater / GreaterReduce

greater

GreaterEqual / ColsGreaterEqual / GreaterEqualReduce

greater_equal

Heaviside / ColsHeaviside / HeavisideReduce

heaviside

Hypot / ColsHypot / HypotReduce

hypot

Lcm / ColsLcm / LcmReduce

lcm

Ldexp / ColsLdexp / LdexpReduce

ldexp

LeftShift / ColsLeftShift / LeftShiftReduce

left_shift

Less / ColsLess / LessReduce

less

LessEqual / ColsLessEqual / LessEqualReduce

less_equal

Logaddexp / ColsLogaddexp / LogaddexpReduce

logaddexp

Logaddexp2 / ColsLogaddexp2 / Logaddexp2Reduce

logaddexp2

LogicalAnd / ColsLogicalAnd / LogicalAndReduce

logical_and

LogicalOr / ColsLogicalOr / LogicalOrReduce

logical_or

LogicalXor / ColsLogicalXor / LogicalXorReduce

logical_xor

Maximum / ColsMaximum / MaximumReduce

maximum

Minimum / ColsMinimum / MinimumReduce

minimum

Mod / ColsMod / ModReduce

mod

Multiply / ColsMultiply / MultiplyReduce

multiply

Nextafter / ColsNextafter / NextafterReduce

nextafter

NotEqual / ColsNotEqual / NotEqualReduce

not_equal

Power / ColsPower / PowerReduce

power

Remainder / ColsRemainder / RemainderReduce

remainder

RightShift / ColsRightShift / RightShiftReduce

right_shift

Subtract / ColsSubtract / SubtractReduce

subtract

TrueDivide / ColsTrueDivide / TrueDivideReduce

true_divide

Reduce modules#

These modules reduce input table columns dimensions by one, by applying an ufunc, for example:

class progressivis.linalg.AddReduce#

Applies numpy.add.reduce over all input columns or over a subset of them

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:

table (PTable) – accepts Sequence[str] as hint πŸ“” Data entry. A selection of columns to be processed could be provided via a hint (see example). When no hint is provided all input columns are processed

Output slots:

result (PDict) – πŸ“” The key’s names follow the input table columns

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

The other reduce modules have the same interface as AddReduce modules above. They are listed in the previous table.

Decorators#

One can transform a simple function into an element-wise module via three decorators @unary_module @binary_module and @reduce_module:

@progressivis.linalg.unary_module(func)#
Return type:

type

@progressivis.linalg.binary_module(func)#
Return type:

type

@progressivis.linalg.reduce_module(func)#
Return type:

type

Examples:

import numpy as np
from progressivis.linalg import unary_module, binary_module, reduce_module
from typing import cast


@unary_module
def CustomUnary(x: float) -> float:
    return cast(float, (x + np.sin(x)) / (x + np.cos(x)))


@binary_module
def CustomBinary(x: float, y: float) -> float:
    return cast(float, (x + np.sin(y)) / (x + np.cos(y)))


@reduce_module
def CustomBinaryReduce(x: float, y: float) -> float:
    return cast(float, (x + np.sin(y)) / (x + np.cos(y)))

Declarative modules#

One can create new modules in a declarative way as subclasses of linalg.mixufunc.MixUfuncABC.

Examples:

import numpy as np
from progressivis.linalg.mixufunc import MixUfuncABC
from progressivis.table.table import PTable
from progressivis.core.module import def_input, def_output
from typing import Any


@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False, datashape={"first": ["_1", "_2"]})
class MixUfuncSample(MixUfuncABC):
    """
    Explanation: the result table has two columns "_1" and "_2" which are calculated
    with the underlying expressions
    NB: Here, columns in first and second table are supposed to be _1, _2, ...
    """

    expr = {"_1": (np.add, "first._2", "second._3"), "_2": (np.log, "second._3")}


@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncSample2(MixUfuncABC):
    """
    The output types can be coerced if necessary
    """

    expr = {
        "_1:float64": (np.add, "first._2", "second._3"),
        "_2:float64": (np.log, "second._3"),
    }


def custom_unary(x: float) -> float:
    return (x + np.sin(x)) / (x + np.cos(x))  # type: ignore


custom_unary_ufunc: Any = np.frompyfunc(custom_unary, 1, 1)


@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncCustomUnary(MixUfuncABC):
    """
    Module using a custom unary function
    """

    expr = {
        "_1:float64": (np.add, "first._2", "second._3"),
        "_2:float64": (custom_unary_ufunc, "second._3"),
    }


def custom_binary(x: float, y: float) -> float:
    return (x + np.sin(y)) / (x + np.cos(y))  # type: ignore


custom_binary_ufunc: Any = np.frompyfunc(custom_binary, 2, 1)


@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class MixUfuncCustomBinary(MixUfuncABC):
    """
    Module using a custom unary function
    """

    expr = {
        "_1:float64": (custom_binary_ufunc, "first._2", "second._3"),
        "_2:float64": (np.log, "second._3"),
    }

Declarative module based on numexpr expressions can be created using NumExprABC this way:

from progressivis.linalg.nexpr import NumExprABC
from progressivis.table.table import PTable
from progressivis.core.module import def_input, def_output


@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False, datashape={"first": ["_1", "_2"]})
class NumExprSample(NumExprABC):
    """
    NB: Here, columns in first and second table are supposed to be _1, _2, ...
    """

    expr = {"_1": "{first._2}+2*{second._3}", "_2": "{first._3}-5*{second._2}"}


@def_input("first", type=PTable)
@def_input("second", type=PTable)
@def_output("result", type=PTable, required=False)
class NumExprSample2(NumExprABC):
    """
    The output types can be coerced if necessary
    """

    expr = {
        "_1:float64": "{first._2}+2*{second._3}",
        "_2:float64": "{first._3}-5*{second._2}",
    }

Linear mapping#

Linear transformation can be performed via this module:

class progressivis.linalg.linear_map.LinearMap#

Performs a linear transformation on rows in vectors table. The transformation table (or a subset of its columns) provides the tranformation matrix once all their rows are read. Its rows number has to be equal to vectors columns number.

Module parameters:
  • quantum (float64) – default: 0.5

  • debug (bool) – default: False

Input slots:
  • vectors (PTable) – accepts Sequence[str] as hint πŸ“” Table providing the row vectors.A selection of columnscould be provided via a hint (see example). When no hint is provided all input columns are user

  • transformation (PTable) – accepts Sequence[str] as hint πŸ“” Table providing the transformation matrix.A selection of columnscould be provided via a hint (same behaviour as vectors)

Output slots:

result (PTable) – πŸ“” Result table, follows the structure of transformation

__init__(**kwds)#
Parameters:

kwds (Any) – extra keyword args to be passed to the Module superclass

Example:

from progressivis.core import Sink, Scheduler
from progressivis.linalg.linear_map import LinearMap
from progressivis.stats import RandomPTable

scheduler = Scheduler()

with scheduler:
    vectors = RandomPTable(20, rows=100000, scheduler=scheduler)
    transf = RandomPTable(20, rows=3, scheduler=scheduler)
    module = LinearMap(scheduler=scheduler)
    module.input.vectors = vectors.output.result["_3", "_4", "_5"]
    module.input.transformation = transf.output.result["_4", "_5", "_6", "_7"]
    sink = Sink(scheduler=scheduler)
    sink.input.inp = module.output.result

Visualisation#

Utility#

Format Adaptors#