Library Reference#

First level variables#

progressivis.__version__#
The version of the progressivis package.

Dataflow Management#

Under the hood, a ProgressiVis program is a dataflow graph, stored in a Dataflow object. When creating or updating a program, a dataflow graph is updated and validated before it is run by the scheduler. There is a two-phase-commit cycle to make sure the currently running dataflow is not broken by an invalid modification.

Once the dataflow is valid, it is run by the scheduler in a round-robin fashion.

Scheduler#

The Scheduler is in charge of running a ProgressiVis program, made of a sorted list of modules.

class progressivis.core.scheduler.Scheduler#

A Scheduler runs progressive modules

default: Scheduler = <progressivis.core.scheduler.Scheduler object>#

Default scheduler, used implicitly when not specified in ProgressiVis methods

property name: str#

Return the scheduler id

timer()#

Return the scheduler timer.

Return type:

float

run_queue_length()#

Return the length of the run queue

Return type:

int

to_json(short=True)#

Return a dictionary describing the scheduler

Return type:

Dict[str, Any]

static set_default()#

Set the default scheduler.

Return type:

None

async step()#

Start the scheduler for on step.

Return type:

None

on_tick(proc, delay=-1)#

Set a procedure to call at each tick.

Return type:

None

remove_tick(proc)#

Remove a tick callback

Return type:

None

on_tick_once(proc)#

Add a oneshot function that will be run at the next scheduler tick. This is especially useful for setting up module connections.

Return type:

None

remove_tick_once(proc)#

Remove a tick once callback

Return type:

None

on_idle(proc, delay=-1)#

Set a procedure that will be called when there is nothing else to do.

Return type:

None

remove_idle(idle_proc)#

Remove an idle callback.

Return type:

None

remove_loop(idle_proc)#

Remove an idle callback.

Return type:

None

all_blocked()#

Return True if all the modules are blocked, False otherwise

Return type:

bool

is_waiting_for_input()#

Return True if there is at least one input module

Return type:

bool

no_more_data()#

Return True if at least one module has data input.

Return type:

bool

commit()#

Forces a pending dataflow to be commited

Return type:

None

Returns:

None

async stop()#

Stop the execution.

Return type:

None

is_running()#

Return True if the scheduler is currently running.

Return type:

bool

is_stopped()#

Return True if the scheduler is stopped.

Return type:

bool

is_terminated()#

Return True if the scheduler is terminated.

Return type:

bool

exists(moduleid)#

Return True if the moduleid exists in this scheduler.

Return type:

bool

modules()#

Return the dictionary of modules.

Return type:

Dict[str, Module]

run_number()#

Return the last run number.

Each time a module is run by the scheduler, the run_number is incremented.

Return type:

int

has_input()#

Return True of the scheduler is in input mode

Return type:

bool

close_all()#

Close all the resources associated with this scheduler.

Return type:

None

Dataflow#

To create or update a program, you need to get a Dataflow object. A scheduler is a context manager returning a Dataflow. Use it like this:

scheduler = Scheduler.default
with scheduler as dataflow:
    m = Max(name="max", scheduler=scheduler)
    prt = Print(name="print_max", scheduler=scheduler)
    m.input.table = table.output.result
    prt.input.df = m.output.result
    dataflow.delete_modules("min", "print_min")

This example creates and add two new modules (max and print_max) to the current Dataflow and removes two other modules (min and print_min).

The context manager can succeed, updating the scheduler with the new dataflow, or fail, producing an error and not updating the scheduler. In that case, the exception contains a structured error message explaining in human terms the problems. Alternatively, the Dataflow.validate() method returns a list of errors (possibly empty) in the current Dataflow that can be fixed before the context manager fails.

class progressivis.core.dataflow.Dataflow#

Class managing a Dataflow, a configuration of modules and slots constructed by the user to be run by a Scheduler.

The contents of a Dataflow can be changed at any time without interfering with the Scheduler. To update the Scheduler, the Dataflow should be validated first then commited.

scheduler: Scheduler#

Scheduler associated with this Dataflow

clear()#

Remove all the modules from the Dataflow

Return type:

None

modules()#

Return all the modules in this dataflow

Return type:

Dict[str, Module]

dir()#

Return the name of all the modules

Return type:

List[str]

get_visualizations()#

Return the visualization modules

Return type:

List[str]

get_inputs()#

Return the input modules

Return type:

List[str]

aborted()#

The dataflow has been aborted before being sent.

Return type:

None

committed()#

The dataflow has been sent to the scheduler.

Return type:

None

add_module(module)#

Add a module to this Dataflow.

Return type:

None

add_connection(slot, rename=True)#

Declare a connection between two module slots

Return type:

None

connect(output_module, output_name, input_module, input_name)#

Declare a connection between two modules slots

Return type:

None

order_modules(dependencies=None)#

Compute a topological order for the modules.

Return type:

List[str]

collect_dependencies()#

Return the dependecies of the modules

Return type:

Dict[str, Set[str]]

validate()#

Validate the Dataflow, returning [] if it is valid or the invalid modules otherwise.

Return type:

List[str]

collateral_damage(*names)#

Return the list of modules deleted when the specified one is deleted.

Parameters:

name – module to delete

Returns:

list of modules relying on or feeding the specified module

Return type:

set

die_if_deps_die(name, deps, maybe_deps)#

Return True if the module would die if the deps modules die, False if not, None if not sure.

Parameters:
  • deps (Set[str]) – a set of module names that will die

  • maybe_deps (Set[str]) – a set of module names that could die

Returns:

True if the module dies, False if it does not, None if not sure

Return type:

Boolean or None

Module#

A Module is used as a function in a regular language. It provides a set of functionalities:

  • connection,

  • validation,

  • execution,

  • control,

  • naming, tagging, and

  • interaction.

The Module class is an abstract base class and cannot be instantiated. All the concrete modules inherits from this class.

class progressivis.core.module.Module#

Base Class for progressivis modules.

Warning

Do not instanciate this class directly!

parameters: List[Tuple[str, dtype[Any], Any]] = [('quantum', dtype('float64'), 0.5), ('debug', dtype('bool'), False)]#

list of parameters managed by modules of this class

TAG_VISUALIZATION = 'visualization'#

Tag attached to modules managing a visualization

TAG_INPUT = 'input'#

Tag attached to input modules

TAG_SOURCE = 'source'#

Tag attached to source modules

TAG_GREEDY = 'greedy'#

Tag attached to greedy modules

TAG_DEPENDENT = 'dependent'#

Tag attached to dependent modules

name#

The module’s name

predictor#

The Time Predictor used by this module

tags: Set[str]#

The set oftags attached to this module

order: int#

The order of this module in the scheduler, or -1 if not valid

group: Optional[str]#

The group this module belongs to

default_step_size: int#

step size used by default when running this module for the first time

input#

input slots are created and accessed through this write-only attribute

output#

output slots are created and accessed through this read-only attribute

static tagged(*tags)#

Create a context manager to add tags to a set of modules created within a scope, typically dependent modules.

Return type:

ModuleTag

grouped()#

Create a context manager to add group to a set of modules created within a scope, typically dependent modules.

Return type:

GroupContext

scheduler()#

Return the scheduler associated with the module.

Return type:

Scheduler

dataflow()#

Return the dataflow associated with the module at creation time.

Return type:

Optional[Dataflow]

classmethod make(factory, name=None, *, scheduler=None, **kwds)#

Get a module of my class if it is already registered, or create a module of my class, register it, and return it.

Parameters:
  • factory (ModuleFactory) – The ModuleFactory registering the modules by name

  • (optional) (name) – The name used to access this module, which can be different from the class name sometimes.

  • kw – keyword parameters used to create the instance if it does not exist.

Return type:

A module of this type.

make_connections(factory, name)#

Create the connections after a module has been created by make

Return type:

None

get_progress()#

Return a tuple of numbers (current,total) where current is current progress value and total is the total number of values to process; these values can change during the computations.

Return type:

Tuple[int, int]

get_quality()#

Quality value, should increase when the quality increases.

Return type:

float

property debug: bool#

Return the value of the debug property

generate_table_name(name)#

Return a uniq name for this module

Return type:

str

timer()#

Return the timer associated with this module

Return type:

float

to_json(short=False, with_speed=True)#

Return a dictionary describing the module

Return type:

dict[str, Any]

async from_input(msg)#

Catch and process a message from an interaction

Return type:

str

is_input()#

Return True if this module is an input module

Return type:

bool

is_data_input()#

Return True if this module brings new data

Return type:

bool

get_image(run_number=None)#

Return an image created by this module or None

Return type:

Any

describe()#

Print the description of this module

Return type:

None

has_any_input()#

Return True if the module has any input

Return type:

bool

get_input_slot(name)#

Return the specified input slot

Return type:

Slot

get_input_module(name)#

Return the specified input module

Return type:

Optional[Module]

validate()#

called when the module have been validated

Return type:

None

default_input()#

Return the input slot considered as the default for this module

Return type:

Union[str, int]

abstract run_step(run_number, step_size, howlong)#

Run one step of the module, with a duration up to the ‘howlong’ parameter.

Returns a dictionary with at least 5 pieces of information: 1) the new state among (ready, blocked, zombie),2) a number of read items, 3) a number of updated items (written), 4) a number of created items, and 5) the effective number of steps run.

Return type:

dict[str, int]

static next_state(slot)#

Return state_ready if the slot has buffered information, or state_blocked otherwise.

Return type:

ModuleState

on_start_run(proc, remove=False)#

Register a callback to execute when the module starts to run.

The callback will bee called with two arguments, the module and

the run_number

Parameters:

remove (bool (optional)) – Set to true to remove the callback from the list of callbacks.

Return type:

None

on_after_run(proc, remove=False)#

Register a callback to execute after the module runs.

The callback will bee called with two arguments, the module and

the run_number

Parameters:

remove (bool (optional)) – Set to true to remove the callback from the list of callbacks.

Return type:

None

on_ending(proc, remove=False)#

Register a callback to execute when the module ends.

The callback will bee called with two arguments, the module and

the run_number

Parameters:

remove (bool (optional)) – Set to true to remove the callback from the list of callbacks.

Return type:

None

last_update()#

Return the last time when the module was updated

Return type:

int

current_params()#

Return the current parameters

Return type:

Row

set_current_params(v)#

Change the current parameters

Return type:

dict[str, Any]

has_input()#

Return True if the module received something via a from_input() call. Usually is a flag set by from_input() and deleted by the following run_step(). See Variable module

Return type:

bool

filter_slot_columns(slot, indices=None, cols=None)#

Return the specified table filtered by the specified indices and limited to the columns of interest.

Return type:

BasePTable

Connection and Validation#

When declaring a new module, its input slots, output slots, and parameters can be declared using three decorators: @def_input, @def_output, and @def_parameter.

@progressivis.core.module.def_input(name, type=None, *, doc='', **kw)#

class decorator to declare an input slot

Parameters:
  • name (str) – the slot name

  • type (Any) – the slot type

  • doc (str) – a docstring

Return type:

Callable[[Type[Module]], Type[Module]]

@progressivis.core.module.def_output(name, type=None, *, attr_name=None, custom_attr=False, doc='', **kw)#

Class decorator to declare an output slot. An output is always associated with an underlying attribute on the current module. This attribute contains the value of the data of this slot from the module. By default this attribute is created by the decorator and it’s name is the slot name.

Parameters:
  • name (str) – the slot name

  • type (Any) – the slot type

  • attr_name (Optional[str]) – optional name for the slot underlying attribute. When missing, the attribute name is the slot name. This is useful, for example, to avoid a naming conflict.

  • custom_attr (bool) – when True the underlying slot attribute is not created by the decorator. In this case the creation of the attribute is the responsibility of the module constructor. This is useful, for example, when the attribute requires special initialization.

  • doc (str) – a docstring

Return type:

Callable[[Type[Module]], Type[Module]]

@progressivis.core.module.def_parameter(name, type, value, *, doc='')#

class decorator to declare a parameter

Parameters:
  • name (str) – the parameter name

  • type (Any) – the parameter type

  • value (Any) – a default value for the parameter

  • doc (str) – an optional docstring

Return type:

Callable[[Type[Module]], Type[Module]]

For example, a new module can be declared like this:

@def_parameter("history", np.dtype(int), 3)
@def_input("filenames", PTable, required=False, doc=FILENAMES_DOC)
@def_output("result", PTable, doc=RESULT_DOC)
class CSVLoader(Module):
  ...

Once a module is created, it should be connected to other modules. As shown in a previous example, the syntax is:

1m = Max(name="max", scheduler=scheduler)
2prt = Print(name="print_max", scheduler=scheduler)
3m.input.table = table.output.result
4prt.input.df = m.output.result

Here, on line 3, the input slot called table from the module m (the max module) is connected to the output slot called result from the module table (its creation is not shown in the example). On line 4, the input slot df of prt (the print_max module) is connected to the output slot result of the m module.

Slot names are checked at creation time: you cannot refer to the name a slot that is not declared in the module. Slots are typed, and the types are checked at validation time. Also some slots are required and others are optional. Their existence is also checked at validation time.

Name, Groups and Tags#

Each module has a unique name, belongs to one group, and can have multiple tags associated with it. A name, group name, and tag name are simply strings.

At creation, a module is given a name, either explicitly if provided in the constructor (as in the example above, with names max and print_max), or automatically if not provided. This name is guaranteed to remain unique in a scheduler. If a name provided at creation time is already used, the creation throws an exception and the module is not created.

A group is also a string, associated with a module at creation time. It is used when several modules are created to work together and should terminate together. A group name can be specified at creation time either in the constructor or using the Module.grouped context manager to associate the group name of a specified module to a set of newly created modules:

scheduler = Scheduler.default
mymainmodule = ...
with mymainmodule.grouped():
    m = Max(name="max", scheduler=scheduler)
    prt = Print(name="print_max", proc=proc, scheduler=scheduler)
    ...

Tags are used to add a simple attribute to a module. Any string can be used as tag, but a few are reserved to specify particular aspects of a module: VISUALIZATION, INPUT, SOURCE, GREEDY, DEPENDENT.

Execution#

TODO

Control#

Callbacks:

ModuleProc = Callable[["Module", int], None]

on_start_run, on_after_run, on_ending

TODO

Connections (Slots)#

Internally, the connection between and input slot and an output slot is materialized with a Slot object. Each slot is typed to make sure an input slot is compatible with the output slots it connects to. At run time, slots carry data with the specified type. There are four main types of progressive data that can be used in slots, as described in Progressive Data Structures.

Slots are also used by modules to know what changed in the data structures since the last time they were run, using the three ChangeBuffer attributes created, updated, and deleted. Change management is described in detail in the next sections.

class progressivis.core.slot.Slot#

A Slot manages one connection between two modules.

output_name#

The output slot name

output_module#

The output module

input_name#

The input slot name

input_module#

The input module

original_name: Optional[str]#

The original input slot before it has been renamed for slots with multiple inputs

name()#

Return the name of the slot

Return type:

str

data()#

Return the data associated with this slot

Return type:

Any

scheduler()#

Return the scheduler associated with this slot

Return type:

Scheduler

last_update()#

Return the run_number of the last update for this slot

Return type:

int

to_json()#

Return a dictionary describing this slot, meant to be serialized in json.

Return type:

Dict[str, Any]

validate_types()#

Validate the types of the endpoints connected through this slot

Return type:

bool

create_changes(buffer_created=True, buffer_updated=False, buffer_deleted=False, buffer_exposed=False, buffer_masked=False)#

Create a ChangeManager associated with the type of the slot’s data.

Return type:

Optional[BaseChangeManager]

update(run_number, buffer_created=True, buffer_updated=True, buffer_deleted=True, buffer_exposed=True, buffer_masked=True, manage_columns=True)#

Compute the changes that occur since this slot has been updated.

Return type:

None

reset()#

Reset the slot

Return type:

None

clear_buffers()#

Clear all the buffers

Return type:

None

has_buffered()#

Return True if any of the created/updated/deleted information is buffered

Return type:

bool

property created: ChangeBuffer#

Return the buffer for created rows

property updated: ChangeBuffer#

Return the buffer for updated rows

property deleted: ChangeBuffer#

Return the buffer for deleted rows

property base: _accessor#

Return an accessor

property selection: _accessor#

Return an accessor

property changemanager: BaseChangeManager | None#

Return the ChangeManager

static create_changemanager(datatype, slot, buffer_created, buffer_updated, buffer_deleted, buffer_exposed, buffer_masked)#

Create the ChangeManager responsible for this slot type or None if no ChangeManager is registered for that type.

Return type:

Optional[BaseChangeManager]

static add_changemanager_type(datatype, cls)#

Declare a ChangerManager class for a slot type

Return type:

None

Change Management#

Modules are run multiple times for a limited amount of time to perform their computation. In between two runs, their input data can be changed by other modules upwards in the dataflow. Under the hood, ProgressiVis tracks what changes from one run to the next in the progressive data structures carried by slots. To simplify this change management, each data structure is considered as an indexed collection, with indices ranging from 0 to the length of the data structure. It means that these data structures are indexed with a main axis. The information regarding the changes are limited to new created entries at specified indices, deleted entries at specified indices, and updated entries at specified indices. For the updated entries, the old values are not kept (so far).

Therefore, when a module runs, it can access its input data through the input slots, and can see what has changed since the last run. It can then decide to act according to the changes. For details, see the Custom Modules section.

Progressive Data Structures#

ProgressiVis relies mainly on four specially designed data types:

  • tables/views

  • columns

  • dictionaries

  • bitmaps.

These data types are instrumented to keep track of the changes happening between two consecutive module runs.

Interaction#

Tables and views#

One of the most important progressive data structure in ProgressiVis is the Table. It is similar to Pandas DataFrame, with several differences though, sometimes due to the progressive nature of the table, sometimes by design, and sometimes by lack of time to provide an interface as extensive as pandas.

Contrary to a pandas DataFrame, a PTable can grow and shrink efficiently. For progressive operations such as progressively loading a table from a file and computing derived values from a table progressively loaded.

Tables come into two main concrete classes: PTable and PTableSelectedView.

ProgressiVis has been designed with user interaction in mind. It supports efficient dynamic filtering of large data tables to restrict the visualization or analyses to subsets of the loaded data. A PTableSelectedView is simply a PTable filtered by a PIntSet. It is used in many places, either for user filtering or for synchronizing multiple data structures to the same set of valid indices.

class progressivis.table.BasePTable#

Base class for progressivis tables and table-views (PTable, PTableSelectedView etc.)

Warning

Do not instanciate this class directly!

drop(index, truncate=False)#

Remove rows by specifying their indices.

Parameters:
  • index (Union[slice, Sequence[int]]) – indices of rows to be dropped

  • truncate (bool) –

    manage dropped indices:

    • if True then the dropped indices > greater (non dropped) index can be reused to index the rows added later

    • If False no dropped index will be reused

    Warning

    in any case, dropped indices < greater (non dropped) index will not be reused

Return type:

None

property loc#

Return a locator object for accessing a group of rows and columns by indices and column names. The following two syntax are allowed:

  • .loc[row-selection]

  • .loc[row-selection, column-selection]

Allowed inputs for row selection are:

  • A single index

  • A list or array of indices, e.g. [1, 3, 5].

  • A slice object with indices, e.g. 1:5.

Allowed inputs column row selection are:

  • A single column name or index

  • A list or array of column names or indices, e.g. ['b', 'd', 'f'] or [1, 3, 5].

  • A slice object with column name or indices, e.g. 'b':'f' or 1:5.

Warning

Just like pandas dataframes slices (but contrary to usual python slices), both the start and the stop bounds are included in the interval

Examples

Getting values

>>> from progressivis.table.table import PTable
>>> data = dict(i=[29, 45, 12, 20, 70],
...             j=[-95, -47, -11, -83, -68],
...             s=["t", "a", "b", "l", "e"],
...             f=[0.741, 0.0812, 0.284, 0.775, 0.884],
...             g=[-0.320, -0.031, -0.717, -0.863, -0.8087]
... )
>>> pt = PTable("pt", data=data)
>>> pt
PTable("pt", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[5]
   Index    |     i      |     j      |     s      |     f      |     g      |
           0|          29|         -95|           t|       0.741|       -0.32|
           1|          45|         -47|           a|      0.0812|      -0.031|
           2|          12|         -11|           b|       0.284|      -0.717|
           3|          20|         -83|           l|       0.775|      -0.863|
           4|          70|         -68|           e|       0.884|     -0.8087|
>>>

Single row

>>> pt.loc[2]
<progressivis.table.row.Row object at 0x7f8907fe2c10>
>>> pt.loc[2].to_dict()
{'i': 12, 'j': -11, 's': 'b', 'f': 0.284, 'g': -0.717}
>>>

Single row/single column

>>> pt.loc[2, "f"]
0.284
>>>

Slicing rows, keeping all columns

>>> pt.loc[1:3]
BasePTable("anonymous", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[3]
   Index    |     i      |     j      |     s      |     f      |     g      |
           1|          45|         -47|           a|      0.0812|      -0.031|
           2|          12|         -11|           b|       0.284|      -0.717|
           3|          20|         -83|           l|       0.775|      -0.863|
>>>

Fancy indexing on rows, keeping all columns

>>> pt.loc[[1, 3]]
BasePTable("anonymous", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[2]
   Index    |     i      |     j      |     s      |     f      |     g      |
           1|          45|         -47|           a|      0.0812|      -0.031|
           3|          20|         -83|           l|       0.775|      -0.863|
>>>

All rows, single column

>>> pt.loc[:, "f"]
BasePTable("anonymous", dshape="{f: float64}")[5]
   Index    |     f      |
           0|       0.741|
           1|      0.0812|
           2|       0.284|
           3|       0.775|
           4|       0.884|
>>>

All rows, list of names for columns

>>> pt.loc[:, ["j", "f"]]
BasePTable("anonymous", dshape="{j: int32, f: float64}")[5]
   Index    |     j      |     f      |
           0|         -95|       0.741|
           1|         -47|      0.0812|
           2|         -11|       0.284|
           3|         -83|       0.775|
           4|         -68|       0.884|
>>>

All rows, list of indices for columns

>>> pt.loc[:, [1, 3]]
BasePTable("anonymous", dshape="{j: int32, f: float64}")[5]
   Index    |     j      |     f      |
           0|         -95|       0.741|
           1|         -47|      0.0812|
           2|         -11|       0.284|
           3|         -83|       0.775|
           4|         -68|       0.884|
>>>

All rows, range of names (slicing) for columns

>>> pt.loc[:, "j":"f"]
BasePTable("anonymous", dshape="{j: int32, s: string, f: float64}")[5]
   Index    |     j      |     s      |     f      |
           0|         -95|           t|       0.741|
           1|         -47|           a|      0.0812|
           2|         -11|           b|       0.284|
           3|         -83|           l|       0.775|
           4|         -68|           e|       0.884|
>>>

All rows, range of indices (slicing) for columns

>>> pt.loc[:, 1:3]
BasePTable("anonymous", dshape="{j: int32, s: string, f: float64}")[5]
   Index    |     j      |     s      |     f      |
           0|         -95|           t|       0.741|
           1|         -47|           a|      0.0812|
           2|         -11|           b|       0.284|
           3|         -83|           l|       0.775|
           4|         -68|           e|       0.884|
>>>

Setting values

Setting unique value

>>> pt.loc[3, "f"] = 0.0
>>> pt
PTable("pt", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[5]
   Index    |     i      |     j      |     s      |     f      |     g      |
           0|          29|         -95|           t|       0.741|       -0.32|
           1|          45|         -47|           a|      0.0812|      -0.031|
           2|          12|         -11|           b|       0.284|      -0.717|
           3|          20|         -83|           l|         0.0|      -0.863|
           4|          70|         -68|           e|       0.884|     -0.8087|
>>>

Broadcasting a value over a column

>>> pt.loc[:, "f"] = 0.
>>> pt
PTable("pt", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[5]
   Index    |     i      |     j      |     s      |     f      |     g      |
           0|          29|         -95|           t|         0.0|       -0.32|
           1|          45|         -47|           a|         0.0|      -0.031|
           2|          12|         -11|           b|         0.0|      -0.717|
           3|          20|         -83|           l|         0.0|      -0.863|
           4|          70|         -68|           e|         0.0|     -0.8087|

Broadcasting a value over a list of columns

>>> pt.loc[:, ["i", "j"]] = 42
>>> pt
PTable("pt", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[5]
   Index    |     i      |     j      |     s      |     f      |     g      |
           0|          42|          42|           t|         0.0|       -0.32|
           1|          42|          42|           a|         0.0|      -0.031|
           2|          42|          42|           b|         0.0|      -0.717|
           3|          42|          42|           l|         0.0|      -0.863|
           4|          42|          42|           e|         0.0|     -0.8087|

Setting a row

>>> pt.loc[2, :] = [42, -42, "B", 4.2, -4.2]
>>> pt
PTable("pt", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[5]
   Index    |     i      |     j      |     s      |     f      |     g      |
           0|          42|          42|           t|         0.0|       -0.32|
           1|          42|          42|           a|         0.0|      -0.031|
           2|          42|         -42|           B|         4.2|        -4.2|
           3|          42|          42|           l|         0.0|      -0.863|
           4|          42|          42|           e|         0.0|     -0.8087|
>>>

Setting some columns in a row

>>> pt.loc[3, ["i", "s"]] = [0, "L"]
>>> pt
PTable("pt", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[5]
   Index    |     i      |     j      |     s      |     f      |     g      |
           0|          42|          42|           t|         0.0|       -0.32|
           1|          42|          42|           a|         0.0|      -0.031|
           2|          42|         -42|           B|         4.2|        -4.2|
           3|           0|          42|           L|         0.0|      -0.863|
           4|          42|          42|           e|         0.0|     -0.8087|
>>>

Setting many values in a column

>>> pt.loc[1:3, "i"] = [43, 44, 45]
>>> pt
PTable("pt", dshape="{i: int32, j: int32, s: string, f: float64, g: float64}")[5]
   Index    |     i      |     j      |     s      |     f      |     g      |
           0|          42|          42|           t|         0.0|       -0.32|
           1|          43|          42|           a|         0.0|      -0.031|
           2|          44|         -42|           B|         4.2|        -4.2|
           3|          45|          42|           L|         0.0|      -0.863|
           4|          42|          42|           e|         0.0|     -0.8087|
>>>
property at#

Return an object for indexing values using ids

property size: int#

Return the size of this table, which is the number of rows

property is_identity: bool#

Return True if the index is using the identity mapping

property last_id: int#

Return the last id of this table

property last_xid: int#

Return the last eXisting id of this table

width(colnames=None)#

Return the number of effective width (number of columns) of the table

Since a column can be multidimensional, the effective width of a table is the sum of the effective width of each of its columns.

Parameters:

colnames (list or None) – The optional list of columns to use for counting, or all the columns when not specified or None.

Return type:

int

property shape: Tuple[int, ...]#

Return the shape of this table as if it were a numpy array

to_json(**kwds)#

Return a dictionary describing the contents of this columns.

Return type:

Dict[str, Any]

to_dict(orient='dict', columns=None)#

Return a dictionary describing the contents of this columns.

Parameters:
  • orient ({'dict', 'list', 'split', 'rows', 'datatable', 'records', 'index'}) – TODO

  • columns (list or None) – TODO

Return type:

Any

column_offsets(columns, shapes=None)#

Return the offsets of each column considering columns can have multiple dimensions

Return type:

List[int]

property columns: List[str]#

Return the list of column names in this table

column_index(name)#

Return the index of the specified column in this table

Return type:

int

property index: PIntSet#

Return the object in change of indexing this table

property ncol: int#

Return the number of columns (same as len(table.columns())

property nrow: int#

Return the number of rows (same as len(table))

property name: str#

Return the name of this table

property dshape: DataShape#

Return the datashape of this table

property base: BasePTable | None#

Return the base table for views, or None if the table is not a view

property changes: PTableChanges | None#

Return the PTableChange manager associated with this table or None

compute_updates(start, now, mid, cleanup=True)#

Compute the updates (delta) that happened to this table since the last call.

Parameters:
  • start (int) – Start is interpreted as a virtual time for last time

  • now (int) – Start is interpreted as a virtual time for now

  • mid (str) – An identifier for the object that will ask for updates, usually the name of a slot.

Return type:

Optional[IndexUpdate]

Returns:

None or an IndexUpdate structure which describes the list of rows created, updated, and deleted.

row(loc)#

Return a Row object wrapping the loc

Return type:

Optional[Row]

iterrows()#

Return an iterator returning rows and their ids

Return type:

Iterator[Optional[Row]]

last(key=None)#

Return the last row

Return type:

Any

columns_common_dtype(columns=None)#

Return the dtype that BasePTable.to_array would return.

Parameters:

columns (a list or None) – the columns to extract or, if None, all the table columns

Return type:

dtype[Any]

to_array(locs=None, columns=None, ret=None)#

Convert this table to a numpy array

Parameters:
  • locs (Any) – The rows to extract. Locs can be specified with multiple formats: integer, list, numpy array, Iterable, or slice.

  • columns (Optional[List[str]]) – the columns to extract or, if None, all the table columns

  • return_indices – if True, returns a tuple with the indices of the returned values as indices, followed by the array

  • ret (Optional[ndarray[Any, Any]]) – if None, the returned array is allocated, otherwise, ret is reused. It should be an array of the right dtype and size otherwise it is ignored.

Return type:

ndarray[Any, Any]

class progressivis.table.IndexPTable#

Base class for physical tables (currently PTable)

It implements index management.

Warning

Do not instanciate this class directly!

property index: PIntSet#

Return the object in change of indexing this table

property last_id: int#

Return the last id of this table

iterrows()#

Return an iterator returning rows and their ids

Return type:

Iterator[Optional[Row]]

property last_xid: int#

Return the last eXisting id of this table

class progressivis.table.PTable#

Create a PTable data structure, made of a collection of columns.

A PTable is similar to Python Pandas or R DataFrame, but column-based and supporting fast addition of items.

Example

>>> from progressivis.table import PTable
>>> t = PTable('my-table', dshape='{a: int32, b: float32, c: bool}', create=True)
>>> len(t)
0
>>> t.columns
['a', 'b', 'c']
__init__(name, data=None, dshape=None, fillvalues=None, storagegroup=None, chunks=None, create=None, indices=None)#
Parameters:
  • name (Optional[str]) – the name of the table

  • data (Any) –

    optional container: contained sata that will be appended to the table. It can be of multiple types:

    • PTable: another table is used to fill-up this table

    • pandas.DataFrame: a Pandas DataFrame is copied to this table

    • numpy.ndarray: a numpy array is copied. The dshape should be provided

  • dshape (Union[str, DataShape, None]) – data shape such as {'a': int32, 'b': float64, 'c': string} The column names and types as specified by the datashape library.

  • fillvalues (Optional[Dict[str, Any]]) – the default values of the columns specified as a dictionary Each column is created with a default fillvalue. This parameter can specify the fillvalue of each column with 3 formats: a single value) which will be used by all the column creations a dictionary) associating a column name to a value the ‘*’ entry in a dictionary) for defaulting the fillvalue when not specified

  • storagegroup (Optional[Group]) – a factory used to create the columns When not specified or None the default storage is used. Otherwise, a storagegroup is specified in Group.

  • chunks (Union[None, int, Dict[str, Union[int, Tuple[int, ...]]]]) – the specification of the chunking of columns when the storagegroup supports it. Like the fillvalue argument, it can be one value or a dict.

  • create (Optional[bool]) – forces the creation of the table. Because the the storagegroup allows persistence, a table with the same name may exist in the storagegroup. With create=False, the previous value is loaded, whereas with create=True it is replaced.

  • indices (Optional[Any]) – the indices of the rows appended when data is specified, in case the table contents has to be joined with another table.

property name: str#

Return the name of this table

property storagegroup: Group#

Return the storagegroup form this column

touch_rows(loc=None)#

Signals that the values at loc have been changed

Return type:

None

append(data, indices=None)#

Append rows of the tabular data (i.e. PTable, pandas.DataFrame, pyarrow.RecordBatch or dict of arrays) to the end of self. The data has to be compatible. It can be from multiple sources [more details needed].

Parameters:
  • data (Any) – data to be appended

  • indices (Optional[Any]) – allows to force indices for the appended rows

Return type:

None

add(row, index=None)#

Add one row to the end of self.

Parameters:
  • row (Any) – the row to be added (typically a dict or a sequence)

  • index (Optional[Any]) – allows to force a the index value of the added row

Return type:

int

static from_array(array, name=None, columns=None, offsets=None, dshape=None, **kwds)#

Offsets is a list of indices or pairs.

Return type:

PTable

eval(expr, inplace=False, name=None, result_object=None, locs=None, as_slice=True)#

Evaluate the expr on columns and return the result.

Parameters:
  • inplace (bool) – boolean, optional Apply the changes in place

  • name (Optional[str]) – string used when a new table/view is created, otherwise ignored

  • result_object (Optional[str]) – string Posible values for result_object: {‘raw_numexpr’, ‘index’, ‘view’, ‘table’} When expr is conditional. Note: a result as ‘view’ is not guaranteed: it may be ‘table’ when the calculated index is not sliceable - ‘table’ or None when expr is an assignment Default values for result_object : - ‘indices’ when expr is conditional - NA i.e. always None when inplace=True, otherwise a new table is returned

Return type:

Any

class progressivis.table.PTableSelectedView#

Virtual table built on top of a PTable or a PTableSelectedView

__init__(base, selection=slice(0, None, None), columns=None, computed=None)#
Parameters:
  • base (BasePTable) – the table (stored or virtual) on which the current view is built

  • selection (Union[PIntSet, slice]) – indices to be part of the view

  • columns (Optional[List[str]]) – selection of columns to be included in the view

  • computed (Optional[Dict[str, Any]]) – computed columns (as dict keys) and their associated expressions (as dict values)

Columns#

A table is a collection of named and typed columns. In a table, all the columns have the same length and all the elements in a column have the same type.

class progressivis.table.BasePColumn#

Base class for columns.

property loc: Any#

Return the accessor by id

property name: str#

Return the name

property index: IndexPTable#

Retur the index

property base: BasePColumn | None#

Return the base column or None

abstract property size: int#

Return the size of the column

abstract property fillvalue: Any#

Return the default value for elements created when the column is enlarged

id_to_index(loc, as_slice=True)#

Convert an identifier to an index

Return type:

Any

update()#

Synchronize the column size with its index if needed. This method can be called safely any time, it will do nothing if nothing needs to be done.

Return type:

None

info_contents()#

Return a string describing the contents of the column

Return type:

str

has_freelist()#

Return True of the column manages a free list

Return type:

bool

tolist()#

Return a list from the values of the column

Return type:

List[Any]

read_direct(array, source_sel=None, dest_sel=None)#

Read data from column into an existing NumPy array.

Selections must be the output of numpy.s_[<args>] or slice.

Return type:

None

abstract property value: ndarray[Any, Any]#

Return a numpy array-compatible object containing the values

property values: ndarray[Any, Any]#

Synonym with value

abstract resize(newsize)#

Resize this column

Return type:

None

abstract property shape: Tuple[int, ...]#

Return the shape of that column

abstract set_shape(shape)#

Set the shape of that column. The semantics can be different than that of numpy.

Return type:

None

abstract property maxshape: Tuple[int, ...]#

Return the maximum shape

abstract property dtype: dtype[Any]#

Return the dtype

abstract property dshape: DataShape#

Return the datashape

abstract property chunks: Tuple[int, ...]#

Return the chunk size

last()#

Return the last element

Return type:

Any

property changes: PTableChanges | None#

Return the ChangeManager associated with the index of this column

compute_updates(start, now, mid, cleanup=True)#

Return the updates of this column managed by the index

Return type:

Optional[IndexUpdate]

unary(operation, **kwargs)#

Unary function manager

Return type:

ndarray[Any, Any]

binary(operation, other, **kwargs)#

Binary function manager

Return type:

ndarray[Any, Any]

any(**kwargs)#

Return True if any element is not False

Return type:

bool

all(**kwargs)#

Return True if all the elements are True

Return type:

bool

min(**kwargs)#

Return the min value

Return type:

Any

max(**kwargs)#

Return the max value

Return type:

Any

class progressivis.table.PColumn#
__init__(name, index, base=None, storagegroup=None, dshape=None, fillvalue=None, shape=None, chunks=None, indices=None, data=None)#

Create a new column.

if index is None and self.index return None, a new index and dataset are created.

property chunks: Tuple[int, ...]#

Return the chunk size

property shape: Tuple[int, ...]#

Return the shape of that column

set_shape(shape)#

Set the shape of that column. The semantics can be different than that of numpy.

Return type:

None

property maxshape: Tuple[int, ...]#

Return the maximum shape

property dtype: dtype[Any]#

Return the dtype

property dshape: DataShape#

Return the datashape

property size: int#

Return the size of the column

property fillvalue: Any#

Return the default value for elements created when the column is enlarged

property value: Any#

Return a numpy array-compatible object containing the values

read_direct(array, source_sel=None, dest_sel=None)#

Read data from column into an existing NumPy array.

Selections must be the output of numpy.s_[<args>] or slice.

Return type:

None

resize(newsize)#

Resize this column

Return type:

None

Computed columns#

In addition to stored columns, tables can contain virtual columns computed from the contents of other columns. To create a computed column, you need to instantiate an object of class SingleColFunc, MultiColFunc or MultiColExpr and add it to the computed dictionary of the table. Subsequently, a view constructed on this table will be able to utilize the new column.

class progressivis.table.compute.SingleColFunc#

This class instances supply the information for constructing a computed table column. This column is build over another (stored or computed) column using an universal function (numpy.ufunc) a vectorized function numpy.vectorize or a custom function compatible with numpy.apply_along_axis.

Example

Computed column using an universal function

>>> from progressivis.table import PTable
>>> from progressivis.table.compute import SingleColFunc
>>> import numpy as np
>>> t = PTable("t", dshape="{a: int, b: float32}", create=True)
>>> t.resize(5)
>>> np.random.seed(42)
>>> t["a"] = np.random.randint(100, size=5)
>>> fvalues = np.array(np.random.rand(20), np.float32)
>>> t["b"] = np.random.rand(5)
>>> t
PTable("t", dshape="{a: int32, b: float32}")[5]
   Index    |     a      |     b      |
           0|          51|  0.23277134|
           1|          92| 0.090606436|
           2|          14|  0.61838603|
           3|          71|    0.382462|
           4|          60|   0.9832309|
>>> colfunc = SingleColFunc(func=np.arcsin, base="b")
>>> t.computed["arcsin_b"] = colfunc
>>> t.loc[:, :]
BasePTable("anonymous", dshape="{a: int32, b: float32, arcsin_b: float32}")[5]
   Index    |     a      |     b      |  arcsin_b  |
           0|          51|  0.23277134|  0.23492633|
           1|          92| 0.090606436|  0.09073087|
           2|          14|  0.61838603|   0.6666873|
           3|          71|    0.382462|  0.39245942|
           4|          60|   0.9832309|    1.387405|
>>>
base: str = ''#

column(s) to be provided as input(s)

func()#

function to be applied to the elements of the input column.

dshape: Optional[DataShape] = None#

column datashape as specified by the datashape library

dtype: Optional[dtype[Any]] = None#

column datatype

xshape: Tuple[int, ...] = ()#

column shape excluding the first axis (axis=0). Useful only when column elements are multidimensional

class progressivis.table.compute.MultiColFunc#

This class instances supply the information for constructing a computed table column based on two or many other columns.

Example

>>> from progressivis.table import PTable
>>> from progressivis.table.compute import MultiColFunc
>>> import numpy as np
>>> from typing import Any, Dict
>>> t = PTable("t", dshape="{a: int, b: float32}", create=True)
>>> t.resize(5)
>>> np.random.seed(42)
>>> t["a"] = np.random.randint(100, size=5)
>>> fvalues = np.array(np.random.rand(20), np.float32)
>>> t["b"] = np.random.rand(5)
>>> t
PTable("t", dshape="{a: int32, b: float32}")[5]
   Index    |     a      |     b      |
           0|          51|  0.23277134|
           1|          92| 0.090606436|
           2|          14|  0.61838603|
           3|          71|    0.382462|
           4|          60|   0.9832309|
>>> def _axb(index, local_dict: Dict[str, Any]) -> Any:
...     return local_dict["a"] * local_dict["b"]
...
>>> colfunc = MultiColFunc(func=_axb, base=["a", "b"], dtype=np.dtype("float32"))
>>> t.computed["a_x_b"] = colfunc
>>> t.loc[:, :]
BasePTable("anonymous", dshape="{a: int32, b: float32, a_x_b: float32}")[5]
   Index    |     a      |     b      |   a_x_b    |
           0|          51|  0.23277134|[11.8713381.|
           1|          92| 0.090606436|[8.33579212]|
           2|          14|  0.61838603|[8.65740442]|
           3|          71|    0.382462|[27.1548016.|
           4|          60|   0.9832309|[58.9938533.|
>>>
base: list[str]#

columns to be provided as inputs

func(b)#

function reference the function must have the following signature:

def some_function(index: Any, local_dict: Dict[str, Any]) -> Any where:

  • index is the index of the column

  • local_dict contains the input columns (the keys are the column names)

dshape: Optional[DataShape] = None#

column datashape as specified by the datashape library

dtype: Optional[dtype[Any]] = None#

column datatype

xshape: Tuple[int, ...] = ()#

column shape excluding the first axis (axis=0). Useful only when column elements are multidimensional

class progressivis.table.compute.MultiColExpr#

This class instances supply the information for constructing a computed table column based on two or many other columns. Instead of a python function it uses a numexpr expression

Example

>>> from progressivis.table import PTable
>>> from progressivis.table.compute import MultiColExpr
>>> import numpy as np
>>> from typing import Any, Dict
>>> t = PTable("t", dshape="{a: int, b: float32}", create=True)
>>> t.resize(5)
>>> np.random.seed(42)
>>> t["a"] = np.random.randint(100, size=5)
>>> fvalues = np.array(np.random.rand(20), np.float32)
>>> t["b"] = np.random.rand(5)
>>> t
PTable("t", dshape="{a: int32, b: float32}")[5]
   Index    |     a      |     b      |
           0|          51|  0.23277134|
           1|          92| 0.090606436|
           2|          14|  0.61838603|
           3|          71|    0.382462|
           4|          60|   0.9832309|
>>> colexpr = MultiColExpr(expr="a*b", base=["a", "b"], dtype=np.dtype("float32"))
>>> t.computed["a_x_b"] = colexpr
>>> t.loc[:, :]
BasePTable("anonymous", dshape="{a: int32, b: float32, a_x_b: float32}")[5]
   Index    |     a      |     b      |   a_x_b    |
           0|          51|  0.23277134| [11.871338]|
           1|          92| 0.090606436|  [8.335793]|
           2|          14|  0.61838603|  [8.657404]|
           3|          71|    0.382462| [27.154802]|
           4|          60|   0.9832309| [58.993855]|
>>>
base: list[str]#

columns to be provided as inputs

expr: str = ''#

numexpr expression

dshape: Optional[DataShape] = None#

column datashape as specified by the datashape library

dtype: Optional[dtype[Any]] = None#

column datatype

xshape: Tuple[int, ...] = ()#

column shape excluding the first axis (axis=0). Useful only when column elements are multidimensional

The PDict class#

Progressive dictionaries are used as dictionaries with their changes tracked.

class progressivis.utils.psdict.PDict#

progressive dictionary

__init__(other=None, **kwargs)#
key_of(id)#

returns (key, status) key: the key associated to id status: {active|deleted}

Return type:

Tuple[str, str]

clear()#
Return type:

None

The PIntSet class#

A PIntSet is a set of integer values implemented efficiently. They are used in many places in ProgressiVis, e.g., for indexing tables and for representing masks inside a PTableSelectedView.

class progressivis.core.pintset.PIntSet#

Derive from an efficient and light-weight ordered set of 32 bits integers.

__init__(values=None, copy_on_write=False, optimize=True)#
clear()#

Clear the PIntSet in-place

Return type:

None

update(values)#

Add new values from either a PIntSet, an array, a slice, or an Iterable

Return type:

None

pop(length=1)#

Remove one or many items and return them as a PIntSet

Return type:

PIntSet

to_slice_maybe()#

Convert this PIntSet to a slice if possible, or return self

Return type:

Union[slice, PIntSet]

static aspintset(x)#

Try to coerce the value as a PIntSet

Return type:

PIntSet

flip(start, end)#

Compute the negation of the PIntSet within the specified interval.

Return type:

PIntSet

static union(*pintsets)#

Return the union of the pintsets.

Return type:

PIntSet

static intersection(*pintsets)#

Return the intersection of the pintsets.

Return type:

PIntSet

static deserialize(buff)#

Generate a PIntSet from the given serialization.

Return type:

PIntSet

Decorators#

Method Decorators#

Class Decorators#