Basic implementation of sliding page widget.
import threading
import itertools
from math import ceil
from collections import OrderedDict
from functools import total_ordering
from gi.repository import Clutter, GObject
import pisak
from pisak import res, logger, exceptions, properties, scanning, layout, \
unit, configurator, unit
_LOG = logger.get_logger(__name__)
[docs]class DataItem:
Single data item that can be put on a `data` list owned by
a `DataSource` instance. Data items are then used to produce some
full-feature widgets or other objects managed by an application.
Data items can be compared with each another and thus can be sorted.
:param content: proper data content.
:param cmp_key: key used for comparisons.
def __init__(self, content, cmp_key, flags=None):
self.content = content
self.cmp_key = cmp_key
self.flags = flags or {} # extra flags that can be set on a data item
def _is_valid_operand(self, other):
return isinstance(other, DataItem)
def __lt__(self, other):
if not self._is_valid_operand(other):
return NotImplementedError
return self.cmp_key < other.cmp_key
def __bool__(self):
return bool(self.content)
[docs]class LazyWorker:
Lazy worker class. Loads data in a separate thread.
def __init__(self, src):
self._src = src
self._step = 10
self._worker = None
self._running = True
def _lazy_work(self):
Main worker function that loads all the data at once, in small portions.
Each portion is '_step' number of elements long. Loads one
portion of elements with identifiers from the front
of the ids list and one portion from the back, in turns.
if self._src.lazy_offset is not None:
any_left = True
while any_left and self._running:
any_left = self._load_portion_by_number(
self._src.lazy_offset, self._step)
self._src.lazy_offset += self._step
flat = list(range(0, len(self._src._ids), self._step))
half_len = int(len(flat)/2)
mixed = [idx for idx in itertools.chain(*itertools.zip_longest(
flat[ :half_len], reversed(flat[half_len: ]))) if idx is not None]
for idx in mixed:
if not self._running:
self._load_portion_by_ids(ids=self._src._ids[idx : idx+self._step])
def _load_portion_by_ids(self, ids):
Load some portion of data items with the given identifiers.
:param ids: list of ids specifying which data items should be loaded.
self._src._lazy_data.update(list(zip(map(str, ids),
self._src.data = self._src.produce_data(
[(val, None) for val in list(self._src._lazy_data.values())],
def _load_portion_by_number(self, offset, number):
data = self._src._query_portion_of_data_by_number(offset, number)
if data:
ids = list(range(offset, offset + len(data)))
self._src._lazy_data.update(list(zip(map(str, ids), data)))
self._src.data = self._src.produce_data(
[(val, None) for val in list(self._src._lazy_data.values())],
return data
def step(self):
Integer, number of data items loaded at each step.
After setting this, data is started to being loaded.
return self._step
def step(self, value):
self._step = value
[docs] def stop(self):
Stop the loader, stop any on-going activities.
self._running = False
if self._worker is not None:
if self._worker.is_alive():
self._worker = None
[docs] def start(self):
Start the loader.
self._running = True
if not self._worker:
self._worker = threading.Thread(target=self._lazy_work,
_LOG.warning('Lazy loader has been started already.')
[docs]class DataSource(GObject.GObject, properties.PropertyAdapter,
Base class for the PISAK data sources.
It can work in a `lazy_loading` mode of operation by setting this property
to True. However, it should be noted that only few of all the
methods are compatible with the 'lazy' mode and are able to take
advantage of the functionality it provides.
So far, these are: `get_items_forward` and `get_items_backward`.
Before enabling the 'lazy' mode, there should also be certain things
supplied by a child class.
__gtype_name__ = "PisakDataSource"
__gsignals__ = {
"data-is-ready": (
GObject.SIGNAL_RUN_FIRST, None, ()),
"length-changed": (
'reload': (
GObject.SIGNAL_RUN_FIRST, None, ())
__gproperties__ = {
"item_ratio_width": (
GObject.TYPE_FLOAT, None, None,
0, 1., 0, GObject.PARAM_READWRITE),
"item_ratio_height": (
GObject.TYPE_FLOAT, None, None,
0, 1., 0, GObject.PARAM_READWRITE),
"item_ratio_spacing": (
GObject.TYPE_FLOAT, None, None, 0, 1., 0,
"item_preview_ratio_width": (
GObject.TYPE_FLOAT, None, None, 0, 1., 0,
"item_preview_ratio_height": (
GObject.TYPE_FLOAT, None, None, 0, 1., 0,
"custom_topology": (
"", "", False,
def __init__(self):
self._length = 0
self._lazy_loading = False
self.page_idx = None
self.custom_topology = False
self.from_idx = 0
self.to_idx = 0
self.data_sets_count = 0
self.data_generator = None
self._target_spec = None
self._data = []
self._data_set_idx = None
self.item_handler = None
self._data_sorting_key = None
# synchronization for an access to the `data` buffer.
self._lock = threading.RLock()
# something to do when new data is available..
self.on_new_data = None
self.data_sets_ids_list = None
def target_spec(self):
Specification of a target that the `DataSource` serves as a data supplier for.
If the lazy loading mode is on then the lazy loader is triggered here.
return self._target_spec
def target_spec(self, value):
self._target_spec = value
if self.lazy_loading:
self._lazy_loader.step = value['columns'] * value['rows']
def custom_topology(self):
Whether the custom topology mode of the elements positioning
should be applied, boolean.
return self._custom_topology
def custom_topology(self, value):
self._custom_topology = value
def item_ratio_spacing(self):
Value relative to the item width/height.
return self._item_ratio_spacing
def item_ratio_spacing(self, value):
self._item_ratio_spacing = value
def item_ratio_height(self):
Item widget height, as a fraction of the whole screen height.
return self._item_ratio_height
def item_ratio_height(self, value):
self._item_ratio_height = value
def item_ratio_width(self):
Item widget width, as a fraction of the whole screen width.
return self._item_ratio_width
def item_ratio_width(self, value):
self._item_ratio_width = value
def item_preview_ratio_height(self):
Value relative to the item height.
return self._item_preview_ratio_height
def item_preview_ratio_height(self, value):
self._item_preview_ratio_height = value
def item_preview_ratio_width(self):
Value relative to the item width.
return self._item_preview_ratio_width
def item_preview_ratio_width(self, value):
self._item_preview_ratio_width = value
def data_set_idx(self):
Idx of the current data set.
return self._data_set_idx
def data_set_idx(self, value):
self._data_set_idx = value
if self.data_generator is not None:
if self.data_sets_ids_list and value <= \
value = self.data_sets_ids_list[value-1]
elif value <= self.data_sets_count:
return # invalid params for the data generator
self.data = self.data_generator(value)
def data(self):
List of some arbitrary data items. Each single item should
be an instance of the `DataItem` class.
with self._lock:
return self._data.copy()
def data(self, value):
with self._lock:
self._data = value
self._length = len(value)
self.emit('length-changed', self._length)
[docs] def reload(self):
def length(self):
Total length of the whole available data set.
return self._length
def _generate_items_normal(self):
Generate items and structure them into a nested list.
data = self.data
rows, cols = self.target_spec["rows"], self.target_spec["columns"]
to = self.to_idx
if self.from_idx + rows*cols >= self._length:
to = self.from_idx + rows*cols
items = []
idx = 0
for index in range(self.from_idx, to):
if idx % cols == 0:
row = []
idx += 1
if index < self._length and index < self.to_idx:
item = self._produce_item(data[index])
elif index > self._length or index >= self.to_idx:
item = Clutter.Actor()
return items
def _generate_items_flat(self):
Generate items and place them all into a flat list.
items = []
for index in range(self.from_idx, self.to_idx):
if index < self._length:
item = self._produce_item(self.data[index])
return items
def _prepare_item(self, item):
Adjust the given item according to the target specification in order
to fit its space. If no target specification is set then nothing
happens. Then, set all the proper attributes of the item.
:param item: single item generated from data
if self.target_spec is not None:
columns = self.target_spec["columns"]
rows = self.target_spec["rows"]
target_spacing = self.target_spec["spacing"]
item_width = (self.target_spec["width"] - (columns-1) *
target_spacing) / columns
item_height = (self.target_spec["height"] - (rows-1) *
target_spacing) / rows
item.set_size(item_width, item_height)
for prop in ("item_ratio_width", "item_ratio_height",
"item_ratio_spacing", "item_preview_ratio_width",
if hasattr(self, prop):
value = getattr(self, prop)
if prop in ("item_ratio_width" or "item_ratio_height"):
setattr(item, prop.strip("item_"), value)
if prop == "ratio_spacing" and hasattr(item, "box"):
setattr(item.box, "spacing", value * item.get_height())
elif prop == "preview_ratio_width" and hasattr(item, "preview"):
setattr(item.preview, "width", value * item.get_width())
elif prop == "preview_ratio_height" and hasattr(item, "preview"):
setattr(item.preview, "height", value * item.get_width())
def _produce_item(self, data_item):
raise NotImplementedError
def _prepare_filler(self, filler):
filler.set_background_color(Clutter.Color.new(255, 255, 255, 0))
[docs] def query_items_forward(self, count):
Query a given number of forward items generated from data.
Method is compatible with a normal topology mode but NOT with
a custom one. Data items are picked from a flat data list.
:param count: number of items to be returned.
:return: list of data items or None if in the lazy loading mode.
self.from_idx = self.to_idx % self._length if \
self._length > 0 else 0
self.to_idx = min(self.from_idx + count, self._length)
if self.lazy_loading:
return self._generate_items_normal()
[docs] def query_items_backward(self, count):
Query a given number of backward items generated from data.
Method is compatible with a normal topology mode but NOT with
a custom one. Data items are picked from a flat data list.
:param count: number of items to be returned.
:return: list of data items or None if in the lazy loading mode.
rows, cols = self.target_spec["rows"], self.target_spec["columns"]
self.to_idx = self.from_idx or self._length
if self.to_idx < count:
self.from_idx = self._length - count + self.to_idx
elif self.to_idx == self._length and self._length % (rows*cols) != 0:
self.from_idx = self.to_idx - (self._length % (rows*cols))
self.from_idx = self.to_idx - count
if self.lazy_loading:
return self._generate_items_normal()
[docs] def next_data_set(self):
Move to the next data set if available.
if self.data_set_idx is not None:
self.data_set_idx = self.data_set_idx + 1 if \
self.data_set_idx < self.data_sets_count else 1
[docs] def previous_data_set(self):
Move to the previous data set if avalaible.
if self.data_set_idx is not None:
self.data_set_idx = self.data_set_idx - 1 if \
self.data_set_idx > 1 else self.data_sets_count
[docs] def get_all_items(self):
Get all items from the current data set. Method compatible with
default topology mode of operation.
self.from_idx = 0
self.to_idx = self._length
return self._generate_items_flat()
[docs] def produce_data(self, raw_data, cmp_key_factory):
Generate list of `DataItems` out of some arbitrary raw data.
Produced list can be then used as the `data`. If some given
raw data item is None then it will remain None on a target list.
:param raw_data: container with tuples, each consisting of a
raw data item and dictionary of flags specific to this item.
:param cmp_key_factory: function to retrieve some comparison
key out of a given data item.
:return: list of `DataItems`.
return sorted([DataItem(item, cmp_key_factory(item), flags) for
item, flags in raw_data])
[docs] def clean_up(self):
Clean after any activities of the data source.
if self.lazy_loading:
# ----------------------- LAZY LOADING METHODS ---------------------- #
def _init_lazy_props(self):
Initialize all the properties specific to the lazy loader.
self._lazy_loading = False
# buffer for storing already, lazily, loaded data.
self._lazy_data = OrderedDict()
# list of data identifiers, specific for a given data supplier.
self._ids = []
# offset for lazy data
self.lazy_offset = None
# main lazy loading worker.
self._lazy_loader = LazyWorker(self)
def lazy_loading(self):
Switch the lazy loading mode - can be set True or False,
default is False. In the lazy loading mode, data source will
load only some limited portion of a given data at a time;
automatically load data that will probably be needed in
the future queries; store the already loaded data
in a proper order for a future use.
return self._lazy_loading
def lazy_loading(self, value):
self._lazy_loading = value
if value:
def _query_portion_of_data(self, ids):
Query the data provider for a portion of data with the given ids.
Should be implemented by child.
:params: ids: list of data identifiers.
:return: list of data items.
raise NotImplementedError
def _query_portion_of_data_by_number(self):
raise NotImplementedError
def _query_ids(self):
Query the data provider for a list of all the available data ids.
Should be implemented by child.
:return: list of ids.
raise NotImplementedError
def _set_up_lazy_loading(self):
Initialize all the necessary things and set up the lazy loader.
Should be always called on the lazy loader init.
def _check_ids_range(self):
Query to the data supplier for a list of identifiers of all the
available data. Update the `_lazy_data` container with previously
non-existing ids and prepare placeholders for data
items with these ids, that will be loaded later.
Update the main `data` buffer.
self._ids = self._query_ids()
[(str(ide), None) for ide in self._ids if
str(ide) not in self._lazy_data])
self.data = [None for _ in self._lazy_data]
def _schedule_sending_data(self, direction):
Schedule sending the data as soon as it is available.
Data should be loaded in a background.
:param direction: -1 or 1, that is whether data should be
sent from backward or forward.
Clutter.threads_add_timeout(0, 100, self._send_data, direction)
def _send_data(self, direction):
Send the data somewhere.
:param direction: data in which direction should be sent.
:return: True when no data available or False after sending the data.
if self._has_data(direction):
if not callable(self.on_new_data):
raise exceptions.PisakException(
'No data receiver has been declared.')
except TypeError as exc:
return False
return True
def _has_data(self, direction):
Check if there is a portion of data available, in the given direction.
Data is checked with some offset, just to be sure, in a case when some
indexing has been messed up.
:param direction: -1 or 1, that is whether data should be
checked backward or forward.
:return: True or False
offset = self._lazy_loader.step
if direction == -1:
from_idx = max(self.from_idx - offset, 0)
to_idx = self.to_idx
elif direction == 1:
from_idx = self.from_idx
to_idx = min(self.to_idx + offset, self._length)
raise ValueError('Invalid direction. Must be -1 or 1.')
return to_idx <= len(self.data) and all(self.data[from_idx : to_idx])
def _clean_up_lazy(self):
Take any actions necessary for cleaning after the lazy loader.
[docs] def get_data_ids_list(self):
Get list of identifiers of all the data items.
return self._ids.copy()
# ----------------- END OF LAZY LOADING METHODS ------------------ #
class _Page(scanning.Group):
Page widget supplied to pager as its content.
def __init__(self, items, spacing, strategy, sound, row_sounds):
self.items = [] # flat container for the page content
self.strategy = strategy
self.sound = sound
self.row_sounds = row_sounds
self.layout = Clutter.BoxLayout()
self._add_items(items, spacing)
def _add_items(self, items, spacing):
for index, row in enumerate(items):
group = scanning.Group()
group.strategy = scanning.RowStrategy()
group.sound = self.row_sounds[index]
except IndexError:
group.sound = str(index + 1) if index + 1 < 10 else 'scan'
group.strategy.unwind_to = self.strategy.unwind_to or self
group.strategy.max_cycle_count = self.strategy.max_cycle_count
group.strategy.interval = self.strategy.interval
group_box = layout.Box()
group_box.spacing = spacing
for item in row:
def adjust_content(self):
Adjust content of the page using any internal settings
held by the items themselves. Before adjusting, one should ensure
that page is already aware of its parent and whole environment.
As for now, any adjusting specifications to be applied should be
stored by an item itself and exposed by 'adjust' method.
for item in self.items:
if hasattr(item, "adjust") and callable(item.adjust):
[docs]class PageFlip(scanning.Group):
A one-purpose only mechanism, wrapped into artificial scanning group,
instantaneously flips a pager page when started.
__gtype_name__ = "PisakPageFlip"
__gproperties__ = {
"target": (PagerWidget.__gtype__, "", "", GObject.PARAM_READWRITE),
def __init__(self):
self._target = None
def target(self):
:class:`PagerWidget` instance.
return self._target
def target(self, value):
self._target = value
[docs] def start_cycle(self):
Reimplementation of the generic scanning group method.
Flips the target pager page and schedules the start
of the page scanning cycle.
if self._target is not None:
Clutter.threads_add_timeout(0, self._target.transition_duration,
lambda *_: self._target.scan_page(),
_LOG.warning("PageFlip without target: " + self.get_id())