Skip to content

Public API

All public packages, functions and classes are available in this module.

Functions:

Data classes:

Plotting:

IDSMapping(ids, exclude_empty=True, allow_blind_keys=False)

Bases: Mapping

Parameters:

  • ids –

    ids

  • exclude_empty (bool) –

    exclude_empty

  • allow_blind_keys (bool) –

    allows for the getting and inserting of keys which are not in the _keys, but could still fit in the ids

Source code in duqtools/ids/_mapping.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self,
             ids,
             exclude_empty: bool = True,
             allow_blind_keys: bool = False):
    """__init__

    Parameters
    ----------
    ids :
        ids
    exclude_empty : bool
        exclude_empty
    allow_blind_keys : bool
        allows for the getting and inserting of keys which are not in the _keys,
        but could still fit in the ids
    """
    self._ids = ids
    self.exclude_empty = exclude_empty
    self.allow_blind_keys = allow_blind_keys

    # All available data fields are stored in this set.
    self._keys: Set[str] = set()
    self._paths: Dict[str, Any] = {}

    self.dive(ids, [])

dive(val, path)

Recursively find the data fields.

Parameters:

  • val –

    Current nested object being evaluated

  • path (List) –

    Current path

Source code in duqtools/ids/_mapping.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def dive(self, val, path: list):
    """Recursively find the data fields.

    Parameters
    ----------
    val :
        Current nested object being evaluated
    path : List
        Current path
    """

    if isinstance(val, str):
        return

    if hasattr(val,
               '__getitem__') and not isinstance(val,
                                                 (np.ndarray, np.generic)):
        for i in range(len(val)):
            item = val[i]
            self.dive(item, path + [str(i)])
        return

    if hasattr(val, '__dict__'):
        for key, item in val.__dict__.items():
            self.dive(item, path + [key])
        return

    if not isinstance(val, (np.ndarray, np.generic)):
        return

    if self.exclude_empty and val.size == 0:
        return

    # We made it here, the value can be stored
    str_path = '/'.join(path)
    self._keys.add(str_path)

    cur = self._paths
    for part in path[:-1]:
        cur.setdefault(part, {})
        cur = cur[part]
    cur[path[-1]] = str_path

find_by_group(pattern)

Find keys matching regex pattern by group.

The dict key is defined by match.groups(). Dict entries will be overwritten if the groups are not unique.

Parameters:

  • pattern (str) –

    Regex pattern (must contain groups)

Returns:

  • dict –

    New dict with all matching key/value pairs.

Source code in duqtools/ids/_mapping.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def find_by_group(self, pattern: str) -> Dict[Union[tuple, str], Any]:
    """Find keys matching regex pattern by group.

    The dict key is defined by `match.groups()`.
    Dict entries will be overwritten if the groups are not unique.

    Parameters
    ----------
    pattern : str
        Regex pattern (must contain groups)

    Returns
    -------
    dict
        New dict with all matching key/value pairs.
    """
    pattern = insert_re_caret_dollar(pattern)

    pat = re.compile(pattern)

    new = {}
    for key in self._keys:
        m = pat.match(key)
        if m:
            groups = m.groups()
            idx = groups[0] if len(groups) == 1 else groups
            new[idx] = self[key]

    return new

find_by_index(pattern)

Find keys matching regex pattern using time index.

Must include $i, which is a special character that matches an integer (\d+)

i.e. ids.find_by_index('profiles_1d/$i/zeff.*') returns a dict with zeff and error attributes.

Parameters:

  • pattern (str) –

    Regex pattern, must include a group matching a digit.

Returns:

  • dict –

    New dict with all matching key/value pairs.

Source code in duqtools/ids/_mapping.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def find_by_index(self, pattern: str) -> Dict[str, Dict[int, np.ndarray]]:
    """Find keys matching regex pattern using time index.

    Must include $i, which is a special character that matches
    an integer (`\\d+`)

    i.e. `ids.find_by_index('profiles_1d/$i/zeff.*')`
    returns a dict with `zeff` and error attributes.

    Parameters
    ----------
    pattern : str
        Regex pattern, must include a group matching a digit.

    Returns
    -------
    dict
        New dict with all matching key/value pairs.
    """
    idx_str = '$i'

    if idx_str not in pattern:
        raise ValueError(f'Pattern must include {idx_str} to match index.')

    pattern = insert_re_caret_dollar(pattern)

    pattern = pattern.replace(idx_str, r'(?P<idx>\d+)')
    pat = re.compile(pattern)

    new_dict: Dict[str, Dict[int, np.ndarray]] = defaultdict(dict)

    for key in self._keys:
        m = pat.match(key)

        if m:
            si, sj = m.span('idx')
            new_key = key[:si] + idx_str + key[sj:]

            idx = int(m.group('idx'))
            new_dict[new_key][idx] = self[key]

    return new_dict

findall(pattern)

Find keys matching regex pattern.

Parameters:

  • pattern (str) –

    Regex pattern

Returns:

  • dict –

    New dict with all matching key/value pairs.

Source code in duqtools/ids/_mapping.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def findall(self, pattern: str) -> Dict[str, Any]:
    """Find keys matching regex pattern.

    Parameters
    ----------
    pattern : str
        Regex pattern

    Returns
    -------
    dict
        New dict with all matching key/value pairs.
    """
    pattern = insert_re_caret_dollar(pattern)

    pat = re.compile(pattern)

    return {key: self[key] for key in self._keys if pat.match(key)}

sync(target)

Synchronize updated data back to IMAS db entry.

Shortcut for 'put' command.

Parameters:

  • target (ImasHandle) –

    Points to an IMAS db entry of where the data should be written.

Source code in duqtools/ids/_mapping.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def sync(self, target: ImasHandle):
    """Synchronize updated data back to IMAS db entry.

    Shortcut for 'put' command.

    Parameters
    ----------
    target : ImasHandle
        Points to an IMAS db entry of where the data should be written.
    """

    add_provenance_info(target)

    with target.open() as db_entry:
        self._ids.put(db_entry=db_entry)

to_dataframe(*variables, prefix='profiles_1d', time_steps=None)

Return long format dataframe for given variables.

Search string: {prefix}/{time_step}/{variable}

Parameters:

  • *variables (str) –

    Keys to extract, i.e. zeff, grid/rho_tor

  • prefix (str, optional) –

    First part of the data path

  • time_steps (Sequence[int], optional) –

    List or array of integer time steps to extract. Defaults to all time steps.

Returns:

  • df( pd.DataFrame ) –

    Contains a column for the time step and each of the variables.

Source code in duqtools/ids/_mapping.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def to_dataframe(self,
                 *variables: str,
                 prefix: str = 'profiles_1d',
                 time_steps: Sequence[int] = None) -> pd.DataFrame:
    """Return long format dataframe for given variables.

    Search string:
    `{prefix}/{time_step}/{variable}`

    Parameters
    ----------
    *variables : str
        Keys to extract, i.e. `zeff`, `grid/rho_tor`
    prefix : str, optional
        First part of the data path
    time_steps : Sequence[int], optional
        List or array of integer time steps to extract.
        Defaults to all time steps.

    Returns
    -------
    df : pd.DataFrame
        Contains a column for the time step and each of the variables.
    """
    import pandas as pd
    columns, arr = self.to_numpy(*variables,
                                 prefix=prefix,
                                 time_steps=time_steps)

    df = pd.DataFrame(arr, columns=columns)
    df[TSTEP_COL] = df[TSTEP_COL].astype(int)

    return df

to_numpy(*variables, prefix='profiles_1d', time_steps=None)

Return numpy array containing data for given variables.

Search string: {prefix}/{time_step}/{variable}

Parameters:

  • *variables (str) –

    Keys to extract, i.e. zeff, grid/rho_tor

  • prefix (str, optional) –

    First part of the data path

  • time_steps (Sequence[int], optional) –

    List or array of integer time steps to extract. Defaults to all time steps.

Returns:

  • columns, array –

    Numpy array with a column for the time step and each of the variables.

Source code in duqtools/ids/_mapping.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def to_numpy(
    self,
    *variables: str,
    prefix: str = 'profiles_1d',
    time_steps: Sequence[int] = None
) -> Tuple[Tuple[str, ...], np.ndarray]:
    """Return numpy array containing data for given variables.

    Search string:
    `{prefix}/{time_step}/{variable}`

    Parameters
    ----------
    *variables : str
        Keys to extract, i.e. `zeff`, `grid/rho_tor`
    prefix : str, optional
        First part of the data path
    time_steps : Sequence[int], optional
        List or array of integer time steps to extract.
        Defaults to all time steps.

    Returns
    -------
    columns, array : Tuple[Tuple[str], np.ndarray]
        Numpy array with a column for the time step and each of the
        variables.
    """
    points_per_var = len(self[f'{prefix}/0/{variables[0]}'])

    if not time_steps:
        n_time_steps = len(self[TIME_COL])
        time_steps = range(n_time_steps)
    else:
        n_time_steps = len(time_steps)

    columns = (TSTEP_COL, TIME_COL, *variables)
    n_vars = len(columns)

    arr = np.empty((n_time_steps * points_per_var, n_vars))

    timestamps = self[TIME_COL]

    for t in time_steps:
        for j, variable in enumerate(variables):
            flat_variable = f'{prefix}/{t}/{variable}'

            i_begin = t * points_per_var
            i_end = i_begin + points_per_var

            arr[i_begin:i_end, 0] = t
            arr[i_begin:i_end, 1] = timestamps[t]
            arr[i_begin:i_end, j + 2] = self[flat_variable]

    return columns, arr

ImasHandle

Bases: ImasBaseModel

copy_data_to(destination)

Copy ids entry to given destination.

Parameters:

  • destination (ImasHandle) –

    Copy data to a new location.

Source code in duqtools/ids/_handle.py
 98
 99
100
101
102
103
104
105
106
107
def copy_data_to(self, destination: ImasHandle):
    """Copy ids entry to given destination.

    Parameters
    ----------
    destination : ImasHandle
        Copy data to a new location.
    """
    logger.debug('Copy %s to %s', self, destination)
    copy_ids_entry(self, destination)

delete()

Remove data from entry.

Source code in duqtools/ids/_handle.py
109
110
111
112
113
114
115
116
117
118
119
120
121
@add_to_op_queue('Removing ids', '{self}')
def delete(self):
    """Remove data from entry."""

    # ERASE_PULSE operation is yet supported by IMAS as of June 2022
    path = self.path()
    for suffix in SUFFIXES:
        to_delete = path.with_suffix(suffix)
        logger.debug('Removing %s', to_delete)
        try:
            to_delete.unlink()
        except FileNotFoundError:
            logger.warning('%s does not exist', to_delete)

entry(backend=imasdef.MDSPLUS_BACKEND)

Return reference to imas.DBEntry.

Parameters:

  • backend (optional) –

    Which IMAS backend to use

Returns:

  • entry –

    IMAS database entry

Source code in duqtools/ids/_handle.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def entry(self, backend=imasdef.MDSPLUS_BACKEND):
    """Return reference to `imas.DBEntry.`

    Parameters
    ----------
    backend : optional
        Which IMAS backend to use

    Returns
    ------
    entry : `imas.DBEntry`
        IMAS database entry
    """
    return imas.DBEntry(backend, self.db, self.shot, self.run, self.user)

exists()

Return true if the directory exists.

Returns:

Source code in duqtools/ids/_handle.py
88
89
90
91
92
93
94
95
96
def exists(self) -> bool:
    """Return true if the directory exists.

    Returns
    -------
    bool
    """
    path = self.path()
    return all(path.with_suffix(sf).exists() for sf in SUFFIXES)

from_string(string) classmethod

Return location from formatted string.

Format:

<user>/<db>/<shot>/<run>
<db>/<shot>/<run>

Default to the current user if the user is not specified.

For example:

g2user/jet/91234/555

Parameters:

  • string (str) –

    Input string containing imas db path

Returns:

Source code in duqtools/ids/_handle.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@classmethod
def from_string(cls, string: str) -> ImasHandle:
    """Return location from formatted string.

    Format:

        <user>/<db>/<shot>/<run>
        <db>/<shot>/<run>

    Default to the current user if the user is not specified.

    For example:

        g2user/jet/91234/555

    Parameters
    ----------
    string : str
        Input string containing imas db path

    Returns
    -------
    ImasHandle
    """
    match = IMAS_PATTERN.match(string)

    if match:
        return cls(**match.groupdict())

    raise ValueError(f'Could not match {string!r}')

get(key='core_profiles', **kwargs)

Map the data to a dict-like structure.

Parameters:

  • key (str, optional) –

    Name of profiles to open

  • **kwargs –

    These parameters are passed to initialize IDSMapping.

Returns:

Source code in duqtools/ids/_handle.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get(self, key: str = 'core_profiles', **kwargs) -> IDSMapping:
    """Map the data to a dict-like structure.

    Parameters
    ----------
    key : str, optional
        Name of profiles to open
    **kwargs
        These parameters are passed to initialize `IDSMapping`.

    Returns
    -------
    IDSMapping
    """
    raw_data = self.get_raw_data(key)
    return IDSMapping(raw_data, **kwargs)

get_raw_data(key='core_profiles', **kwargs)

Get data from IDS entry.

Parameters:

  • key (str, optional) –

    Name of profiles to open.

  • **kwargs –

    These keyword parameters are passed to ImasHandle.open().

Returns:

  • data –
Source code in duqtools/ids/_handle.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_raw_data(self, key: str = 'core_profiles', **kwargs):
    """Get data from IDS entry.

    Parameters
    ----------
    key : str, optional
        Name of profiles to open.
    **kwargs
        These keyword parameters are passed to `ImasHandle.open()`.

    Returns
    -------
    data
    """
    with self.open(**kwargs) as data_entry:
        data = data_entry.get(key)

    # reset string representation because output is extremely lengthy
    _patch_str_repr(data)

    return data

open(backend=imasdef.MDSPLUS_BACKEND, create=False)

Context manager to open database entry.

Parameters:

  • backend (optional) –

    Which IMAS backend to use

  • create (bool, optional) –

    Create empty database entry if it does not exist.

Yields:

  • entry –

    Opened IMAS database entry

Source code in duqtools/ids/_handle.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@contextmanager
def open(self, backend=imasdef.MDSPLUS_BACKEND, create: bool = False):
    """Context manager to open database entry.

    Parameters
    ----------
    backend : optional
        Which IMAS backend to use
    create : bool, optional
        Create empty database entry if it does not exist.

    Yields
    ------
    entry : `imas.DBEntry`
        Opened IMAS database entry
    """
    entry = self.entry(backend=backend)
    opcode, _ = entry.open()

    if opcode == 0:
        logger.debug('Data entry opened: %s', self)
    elif create:
        cpcode, _ = entry.create()
        if cpcode == 0:
            logger.debug('Data entry created: %s', self)
        else:
            raise IOError(
                f'Cannot create data entry: {self}. '
                f'Create a new db first using `imasdb {self.db}`')
    else:
        raise IOError(f'Data entry does not exist: {self}')

    try:
        yield entry
    finally:
        entry.close()

path()

Return location as Path.

Source code in duqtools/ids/_handle.py
79
80
81
82
83
84
85
86
def path(self) -> Path:
    """Return location as Path."""
    return Path(
        PATH_TEMPLATE.format(user=self.user,
                             db=self.db,
                             shot=self.shot,
                             run=self.run,
                             suffix=SUFFIXES[0]))

to_string()

Generate string representation of Imas location.

Source code in duqtools/ids/_handle.py
75
76
77
def to_string(self) -> str:
    """Generate string representation of Imas location."""
    return f'{self.user}/{self.db}/{self.shot}/{self.run}'

alt_errorband_chart(source, *, x, y)

Generate an altair errorband plot from a dataframe.

The dataframe must be generated using duqtools.ids.get_ids_dataframe (or have the same format).

Parameters:

  • source (pd.DataFrame) –

    Input dataframe

  • x (str) –

    X-value to plot, corresponds to a column in the source data

  • y (str) –

    Y-value to plot, corresponds to a column in the source data

Returns:

  • alt.Chart –

    Return an altair chart.

Source code in duqtools/_plot_utils.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def alt_errorband_chart(source: pd.DataFrame, *, x: str, y: str) -> alt.Chart:
    """Generate an altair errorband plot from a dataframe.

    The dataframe must be generated using `duqtools.ids.get_ids_dataframe` (or
    have the same format).

    Parameters
    ----------
    source : pd.DataFrame
        Input dataframe
    x : str
        X-value to plot, corresponds to a column in the source data
    y : str
        Y-value to plot, corresponds to a column in the source data

    Returns
    -------
    alt.Chart
        Return an altair chart.
    """
    slider = alt.binding_range(min=0, max=source['tstep'].max(), step=1)
    select_step = alt.selection_single(name='tstep',
                                       fields=['tstep'],
                                       bind=slider,
                                       init={'tstep': 0})

    line = alt.Chart(source).mark_line().encode(
        x=f'{x}:Q',
        y=f'mean({y}):Q',
        color=alt.Color('tstep:N'),
    ).add_selection(select_step).transform_filter(select_step).interactive()

    # altair-viz.github.io/user_guide/generated/core/altair.ErrorBandDef
    band = alt.Chart(source).mark_errorband(
        extent='stdev', interpolate='linear').encode(
            x=f'{x}:Q',
            y=f'{y}:Q',
            color=alt.Color('tstep:N'),
        ).add_selection(select_step).transform_filter(
            select_step).interactive()

    chart = line + band

    return chart

alt_line_chart(source, *, x, y)

Generate an altair line chart from a dataframe.

The dataframe must be generated using duqtools.ids.get_ids_dataframe (or have the same format).

Parameters:

  • source (pd.DataFrame) –

    Input dataframe

  • x (str) –

    X-value to plot, corresponds to a column in the source data

  • y (str) –

    Y-value to plot, corresponds to a column in the source data

Returns:

  • alt.Chart –

    Return an altair chart.

Source code in duqtools/_plot_utils.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def alt_line_chart(source: pd.DataFrame, *, x: str, y: str) -> alt.Chart:
    """Generate an altair line chart from a dataframe.

    The dataframe must be generated using `duqtools.ids.get_ids_dataframe` (or
    have the same format).

    Parameters
    ----------
    source : pd.DataFrame
        Input dataframe
    x : str
        X-value to plot, corresponds to a column in the source data
    y : str
        Y-value to plot, corresponds to a column in the source data

    Returns
    -------
    alt.Chart
        Return an altair chart.
    """
    slider = alt.binding_range(min=0, max=source['tstep'].max(), step=1)
    select_step = alt.selection_single(name='tstep',
                                       fields=['tstep'],
                                       bind=slider,
                                       init={'tstep': 0})

    chart = alt.Chart(source).mark_line().encode(
        x=f'{x}:Q',
        y=f'{y}:Q',
        color=alt.Color('run:N'),
        tooltip='run',
    ).add_selection(select_step).transform_filter(select_step).interactive()

    return chart

get_ids_dataframe(handles, *, keys, **kwargs)

Read a dict of IMAS handles into a structured pandas dataframe.

The returned dataframe will have the columns:

`run`, `tsteps`, `times`, `<ids col 1>, `<ids_col_2>`, ...

Where tstep corresponds to the time index, and times to the actual times.

Parameters:

  • handles (Union[Sequence[str], Dict[str, ImasHandle]]) –

    Dict with IMAS handles. The key is used as the 'run' name in the dataframe. If the handles are specified as a sequence, The Imas string representation will be used as the key.

  • keys (Sequence[str]) –

    IDS values to extract. These will be used as columns in the data frame.

  • **kwargs –

    These keyword parameters are passed to duqtools.ids.IDSMapping.to_dataframe.

Returns:

Source code in duqtools/ids/_io.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_ids_dataframe(handles: Union[Sequence[ImasHandle],
                                     Dict[str, ImasHandle]], *,
                      keys: Sequence[str], **kwargs) -> pd.DataFrame:
    """Read a dict of IMAS handles into a structured pandas dataframe.

    The returned dataframe will have the columns:

        `run`, `tsteps`, `times`, `<ids col 1>, `<ids_col_2>`, ...

    Where `tstep` corresponds to the time index, and `times` to the
    actual times.

    Parameters
    ----------
    handles : Union[Sequence[str], Dict[str, ImasHandle]]
        Dict with IMAS handles. The key is used as the 'run' name in
        the dataframe. If the handles are specified as a sequence,
        The Imas string representation will be used as the key.
    keys : Sequence[str]
        IDS values to extract. These will be used as columns in the
        data frame.
    **kwargs
        These keyword parameters are passed to
        `duqtools.ids.IDSMapping.to_dataframe`.

    Returns
    -------
    pd.DataFrame
        Structured pandas dataframe.
    """
    import pandas as pd

    if not isinstance(handles, dict):
        handles = {handle.to_string(): handle for handle in handles}

    runs_data = {
        str(name): _get_ids_run_dataframe(handle, keys=keys, **kwargs)
        for name, handle in handles.items()
    }

    return pd.concat(runs_data,
                     names=('run',
                            'index')).reset_index('run').reset_index(drop=True)

rebase_on_grid(source, *, grid, cols, grid_base=None)

Rebase data on new ids basis using interpolation.

This operation makes sure that all data on the x-axis are the same for each run and time step.

Uses scipy.interpolate.interp1d.

Parameters:

  • source (pd.DataFrame) –

    Input data, contains the columns 'run', 'tstep' and any number of ids columns.

  • grid (str) –

    This defines the base ids column that the new base belongs to. In other words, this is the x column in the interpolation.

  • cols (Sequence[str]) –

    The data in these ids columns will be interpolated. In other words, these are the y columns in the interpolation. IDS columns not defined by grid and cols will be omitted from the output.

  • grid_base (np.ndarray, optional) –

    Numpy array with the new base values for the given base column. If not defined, use the data in the base column of the first time step of the first run as the basis.

Returns:

  • pd.DataFrame –

    For the returned dataframe, for each run and time step, the values in the base column will be the same.

Source code in duqtools/ids/_rebase.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def rebase_on_grid(source: pd.DataFrame,
                   *,
                   grid: str,
                   cols: Sequence[str],
                   grid_base: np.ndarray = None) -> pd.DataFrame:
    """Rebase data on new ids basis using interpolation.

    This operation makes sure that all data on the x-axis are the same for
    each run and time step.

    Uses [scipy.interpolate.interp1d][].

    Parameters
    ----------
    source : pd.DataFrame
        Input data, contains the columns 'run', 'tstep' and any number of
        ids columns.
    grid : str
        This defines the base ids column that the new base belongs to.
        In other words, this is the `x` column in the interpolation.
    cols : Sequence[str]
        The data in these ids columns will be interpolated.
        In other words, these are the `y` columns in the interpolation.
        IDS columns not defined by grid and cols will be omitted
        from the output.
    grid_base : np.ndarray, optional
        Numpy array with the new base values for the given base column.
        If not defined, use the data in the base column of the first time
        step of the first run as the basis.

    Returns
    -------
    pd.DataFrame
        For the returned dataframe, for each run and time step,
        the values in the base column will be the same.
    """
    if grid_base is None:
        first_run = source.iloc[0].run
        idx = (source[RUN_COL] == first_run) & (source[TSTEP_COL] == 0)
        grid_base = source[idx][grid]
        logger.debug('Rebase ids on %s, using %s from %d to %d with %d steps',
                     first_run, grid, grid_base.min(), grid_base.max(),
                     len(grid_base))

    def refit(gb: pd.DataFrame) -> pd.DataFrame:
        new_values = []

        for value_col in cols:
            f = interp1d(gb[grid],
                         gb[value_col],
                         fill_value='extrapolate',
                         bounds_error=False)
            new_values.append(f(grid_base))

        df = pd.DataFrame((grid_base, *new_values), index=[grid, *cols]).T

        df[TIME_COL] = gb[TIME_COL].iloc[0]
        return df

    grouped = source.groupby([RUN_COL, TSTEP_COL])

    out = grouped.apply(refit).reset_index(
        (RUN_COL, TSTEP_COL)).reset_index(drop=True)

    out = out[[RUN_COL, TSTEP_COL, TIME_COL, grid, *cols]]
    return out

rebase_on_time(source, *, cols, time_base=None)

Rebase data on new time basis using interpolation.

This operation makes sure that each run has the same time steps.

Uses scipy.interpolate.interp1d.

Parameters:

  • source (pd.DataFrame) –

    Input data, contains the columns 'run', 'tstep' and any number of ids columns.

  • cols (Sequence[str]) –

    This defines the columns that should be rebased. IDS columns not defined will be omitted from the output.

  • time_base (np.ndarray, optional) –

    Numpy array with the new base values for the time steps. If not defined, use the time steps in the first run of the source data.

Returns:

  • pd.DataFrame –

    For the returned dataframe, for each run the time steps will be the same.

Source code in duqtools/ids/_rebase.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def rebase_on_time(source: pd.DataFrame,
                   *,
                   cols: Sequence[str],
                   time_base: np.ndarray = None) -> pd.DataFrame:
    """Rebase data on new time basis using interpolation.

    This operation makes sure that each run has the same time steps.

    Uses [scipy.interpolate.interp1d][].

    Parameters
    ----------
    source : pd.DataFrame
        Input data, contains the columns 'run', 'tstep' and any number of
        ids columns.
    cols : Sequence[str]
        This defines the columns that should be rebased.
        IDS columns not defined will be omitted from the output.
    time_base : np.ndarray, optional
        Numpy array with the new base values for the time steps.
        If not defined, use the time steps in the first run of the
        source data.

    Returns
    -------
    pd.DataFrame
        For the returned dataframe, for each run the time steps will
        be the same.
    """
    if time_base is None:
        first_run = source.iloc[0].run
        time_base = source[source[RUN_COL] == first_run][TIME_COL].unique()
        logger.debug('Rebase time on %s, from %d to %d with %d steps',
                     first_run, time_base.min(), time_base.max(),
                     len(time_base))

    cols = list(cols)

    n_cols = len(cols)
    n_time_new = len(time_base)

    def refit(gb: pd.DataFrame) -> pd.DataFrame:
        time = gb[TIME_COL].unique()
        values = np.array(gb[cols])

        n_times = len(time)
        n_vals = int(len(values) / n_times)

        values = values.reshape(n_times, n_vals, n_cols).T

        f = interp1d(time,
                     values,
                     fill_value='extrapolate',
                     bounds_error=False)

        values_new = f(time_base)
        values_new = values_new.T.reshape(n_time_new * n_vals, n_cols)

        time_new = np.repeat(time_base, n_vals).reshape(-1, 1)

        tstep_new = np.repeat(np.arange(n_time_new), n_vals).reshape(-1, 1)

        arr = np.hstack((tstep_new, time_new, values_new))

        out = pd.DataFrame(arr, columns=[TSTEP_COL, TIME_COL, *cols])
        out[TSTEP_COL] = out[TSTEP_COL].astype(np.int64)
        return out

    grouped = source.groupby([RUN_COL])

    out = grouped.apply(refit).reset_index(RUN_COL).reset_index(drop=True)
    return out