Skip to content

HmsResults

Results extraction and analysis from DSS files.

hms_commander.HmsResults

HmsResults - HEC-HMS Results Extraction and Analysis

This module provides static methods for extracting and analyzing HEC-HMS simulation results from DSS files.

All methods are static and designed to be used without instantiation.

HmsResults

Extract and analyze HMS simulation results from DSS files.

Provides high-level methods for accessing flow, precipitation, and other hydrologic results with statistical analysis.

All methods are static - no instantiation required.

Example

from hms_commander import HmsResults flow = HmsResults.get_outflow_timeseries("results.dss", "Outlet") peaks = HmsResults.get_peak_flows("results.dss") print(peaks)

Source code in hms_commander/HmsResults.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
class HmsResults:
    """
    Extract and analyze HMS simulation results from DSS files.

    Provides high-level methods for accessing flow, precipitation, and
    other hydrologic results with statistical analysis.

    All methods are static - no instantiation required.

    Example:
        >>> from hms_commander import HmsResults
        >>> flow = HmsResults.get_outflow_timeseries("results.dss", "Outlet")
        >>> peaks = HmsResults.get_peak_flows("results.dss")
        >>> print(peaks)
    """

    @staticmethod
    @log_call
    def get_outflow_timeseries(
        dss_file: Union[str, Path],
        element_name: str,
        run_name: Optional[str] = None
    ) -> pd.DataFrame:
        """
        Get outflow time series for a specific element.

        Args:
            dss_file: Path to the DSS file
            element_name: Name of the element (junction, outlet, etc.)
            run_name: Optional run name filter

        Returns:
            DataFrame with datetime index and flow values

        Example:
            >>> flow = HmsResults.get_outflow_timeseries("results.dss", "Outlet")
            >>> print(flow.head())
        """
        dss_file = Path(dss_file)

        # Get catalog and find matching flow path
        catalog = HmsDss.get_catalog(dss_file)

        matching_paths = select_result_paths(
            catalog,
            result_type="flow",
            element_names=[element_name],
            run_name=run_name,
            exclude_tables=True,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

        if not matching_paths:
            raise ValueError(f"No flow data found for element '{element_name}'")

        # Read the first matching path (or most recent if multiple)
        path = matching_paths[0]
        logger.info(f"Reading flow data from: {path}")

        df = HmsDss.read_timeseries(dss_file, path)
        df.columns = ['flow']
        return df

    @staticmethod
    @log_call
    def get_precipitation_timeseries(
        dss_file: Union[str, Path],
        element_name: str,
        run_name: Optional[str] = None
    ) -> pd.DataFrame:
        """
        Get precipitation time series for a specific element.

        Args:
            dss_file: Path to the DSS file
            element_name: Name of the subbasin or gage
            run_name: Optional run name filter

        Returns:
            DataFrame with datetime index and precipitation values
        """
        dss_file = Path(dss_file)
        catalog = HmsDss.get_catalog(dss_file)

        matching_paths = select_result_paths(
            catalog,
            result_type="precipitation",
            element_names=[element_name],
            run_name=run_name,
            exclude_tables=True,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

        if not matching_paths:
            raise ValueError(f"No precipitation data found for element '{element_name}'")

        path = matching_paths[0]
        df = HmsDss.read_timeseries(dss_file, path)
        df.columns = ['precipitation']
        return df

    @staticmethod
    @log_call
    def get_peak_flows(
        dss_file: Union[str, Path],
        element_names: Optional[List[str]] = None,
        run_name: Optional[str] = None,
        batch_size: int = 50
    ) -> pd.DataFrame:
        """
        Get peak flow summary for all elements in a DSS file.

        Uses batched extraction for memory efficiency with large DSS files.
        Processing is done in batches to prevent memory exhaustion when
        extracting peaks from files with hundreds of elements.

        Args:
            dss_file: Path to the DSS file
            element_names: Optional list of element names to include
            run_name: Optional run name filter
            batch_size: Elements per batch (default: 50, increase for more RAM)

        Returns:
            DataFrame with columns:
                - element: Element name
                - peak_flow: Peak flow value (cfs)
                - peak_time: Time of peak (datetime)
                - units: Engineering units
                - dss_path: Full DSS pathname

        Example:
            >>> from hms_commander import HmsResults
            >>> peaks = HmsResults.get_peak_flows("results.dss")
            >>> print(peaks.nlargest(10, 'peak_flow'))

        Note:
            For very large DSS files (1000+ elements), you can adjust
            batch_size based on available memory. Lower values use less
            memory but take longer to process.
        """
        dss_file = Path(dss_file)

        return HmsDss.get_peak_flows_batched(
            dss_file,
            element_names=element_names,
            run_name=run_name,
            batch_size=batch_size,
            progress=True
        )

    @staticmethod
    @log_call
    def get_volume_summary(
        dss_file: Union[str, Path],
        run_name: Optional[str] = None,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None
    ) -> pd.DataFrame:
        """
        Get runoff volume summary for all elements.

        Args:
            dss_file: Path to the DSS file
            run_name: Optional run name filter
            start_time: Optional start time for volume calculation
            end_time: Optional end time for volume calculation

        Returns:
            DataFrame with element names and total volumes

        Example:
            >>> volumes = HmsResults.get_volume_summary("results.dss")
            >>> print(volumes)
        """
        dss_file = Path(dss_file)
        flow_paths = HmsDss.list_flow_results(dss_file)

        flow_paths = select_result_paths(
            flow_paths,
            result_type="flow",
            run_name=run_name,
            exclude_tables=True,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

        records = []
        for path in flow_paths:
            try:
                parts = parse_pathname(path)
                df = HmsDss.read_timeseries(dss_file, path)

                if df.empty:
                    continue

                # Filter by time window if specified
                if start_time:
                    df = df[df.index >= start_time]
                if end_time:
                    df = df[df.index <= end_time]

                if df.empty:
                    continue

                # Calculate volume (trapezoidal integration)
                # Assuming flow in CFS and time in hours
                time_diff = df.index.to_series().diff().dt.total_seconds() / 3600  # hours
                flow_values = df.iloc[:, 0].values

                # Volume in acre-feet (CFS * hours * 0.0413)
                volume_af = np.trapz(flow_values, dx=time_diff.mean()) * time_diff.mean() * 0.0413

                records.append({
                    'element': parts['element_name'],
                    'total_volume_af': round(volume_af, 2),
                    'mean_flow': round(df.iloc[:, 0].mean(), 2),
                    'duration_hours': round((df.index[-1] - df.index[0]).total_seconds() / 3600, 2),
                    'run_name': parts['run_name']
                })

            except Exception as e:
                logger.warning(f"Could not process {path}: {e}")

        df = pd.DataFrame(records)
        if not df.empty:
            df = df.sort_values('total_volume_af', ascending=False)

        return df

    @staticmethod
    @log_call
    def get_hydrograph_statistics(
        dss_file: Union[str, Path],
        element_name: str,
        run_name: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Get comprehensive statistics for a flow hydrograph.

        Args:
            dss_file: Path to the DSS file
            element_name: Name of the element
            run_name: Optional run name filter

        Returns:
            Dictionary with hydrograph statistics

        Example:
            >>> stats = HmsResults.get_hydrograph_statistics("results.dss", "Outlet")
            >>> print(f"Peak: {stats['peak_flow']} cfs at {stats['peak_time']}")
        """
        df = HmsResults.get_outflow_timeseries(dss_file, element_name, run_name)

        if df.empty:
            return {}

        flow = df['flow']

        stats = {
            'element': element_name,
            'start_time': df.index[0],
            'end_time': df.index[-1],
            'duration_hours': (df.index[-1] - df.index[0]).total_seconds() / 3600,

            # Flow statistics
            'peak_flow': flow.max(),
            'peak_time': flow.idxmax(),
            'min_flow': flow.min(),
            'mean_flow': flow.mean(),
            'median_flow': flow.median(),
            'std_flow': flow.std(),

            # Volume
            'total_volume_af': None,

            # Timing
            'time_to_peak_hours': None,
            'centroid_time': None,
        }

        # Calculate volume (acre-feet)
        time_diff_hours = df.index.to_series().diff().dt.total_seconds() / 3600
        stats['total_volume_af'] = round(
            np.trapz(flow.values, dx=time_diff_hours.mean()) * time_diff_hours.mean() * 0.0413,
            2
        )

        # Time to peak from start
        stats['time_to_peak_hours'] = round(
            (stats['peak_time'] - stats['start_time']).total_seconds() / 3600,
            2
        )

        # Centroid time (flow-weighted average time)
        try:
            time_numeric = (df.index - df.index[0]).total_seconds() / 3600
            stats['centroid_time'] = round(
                np.average(time_numeric, weights=flow.values),
                2
            )
        except Exception:
            pass

        return stats

    @staticmethod
    @log_call
    def compare_runs(
        dss_files: Union[List[Union[str, Path]], Union[str, Path]],
        element_name: str,
        run_names: Optional[List[str]] = None
    ) -> pd.DataFrame:
        """
        Compare flow results from multiple runs.

        Args:
            dss_files: List of DSS files or single file with multiple runs
            element_name: Name of the element to compare
            run_names: Optional list of run names to compare

        Returns:
            DataFrame with time series from all runs for comparison

        Example:
            >>> comparison = HmsResults.compare_runs(
            ...     ["run1.dss", "run2.dss", "run3.dss"],
            ...     "Outlet"
            ... )
            >>> comparison.plot()
        """
        if isinstance(dss_files, (str, Path)):
            dss_files = [dss_files]

        all_series = {}

        for dss_file in dss_files:
            dss_file = Path(dss_file)

            try:
                # Get flow paths for this file
                flow_paths = select_result_paths(
                    HmsDss.list_flow_results(dss_file),
                    result_type="flow",
                    element_names=[element_name],
                    exclude_tables=True,
                    result_patterns=HmsDss.HMS_RESULT_PATTERNS,
                )

                for path in flow_paths:
                    parts = parse_pathname(path)

                    if run_names and parts['run_name'] not in run_names:
                        continue

                    # Create unique key
                    key = f"{dss_file.stem}_{parts['run_name']}" if parts['run_name'] else dss_file.stem

                    df = HmsDss.read_timeseries(dss_file, path)
                    if not df.empty:
                        all_series[key] = df.iloc[:, 0]

            except Exception as e:
                logger.warning(f"Could not process {dss_file}: {e}")

        if not all_series:
            return pd.DataFrame()

        # Combine all series into a single DataFrame
        result = pd.DataFrame(all_series)
        logger.info(f"Compared {len(all_series)} runs for element '{element_name}'")

        return result

    @staticmethod
    @log_call
    def get_precipitation_summary(
        dss_file: Union[str, Path],
        run_name: Optional[str] = None
    ) -> pd.DataFrame:
        """
        Get precipitation summary for all subbasins.

        Args:
            dss_file: Path to the DSS file
            run_name: Optional run name filter

        Returns:
            DataFrame with precipitation statistics by subbasin
        """
        dss_file = Path(dss_file)
        precip_paths = HmsDss.list_precipitation_data(dss_file)

        precip_paths = select_result_paths(
            precip_paths,
            result_type="precipitation",
            run_name=run_name,
            exclude_tables=True,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

        records = []
        for path in precip_paths:
            try:
                parts = parse_pathname(path)
                df = HmsDss.read_timeseries(dss_file, path)

                if df.empty:
                    continue

                precip = df.iloc[:, 0]

                records.append({
                    'element': parts['element_name'],
                    'total_depth': round(precip.sum(), 2),
                    'max_intensity': round(precip.max(), 2),
                    'duration_hours': round((df.index[-1] - df.index[0]).total_seconds() / 3600, 2),
                    'run_name': parts['run_name']
                })

            except Exception as e:
                logger.warning(f"Could not process {path}: {e}")

        return pd.DataFrame(records)

    @staticmethod
    @log_call
    def export_results_to_csv(
        dss_file: Union[str, Path],
        output_folder: Union[str, Path],
        element_names: Optional[List[str]] = None,
        run_name: Optional[str] = None
    ) -> List[str]:
        """
        Export results to CSV files for external analysis.

        Args:
            dss_file: Path to the DSS file
            output_folder: Folder for output CSV files
            element_names: Optional list of elements to export
            run_name: Optional run name filter

        Returns:
            List of created CSV file paths

        Example:
            >>> files = HmsResults.export_results_to_csv(
            ...     "results.dss",
            ...     "csv_output",
            ...     element_names=["Outlet", "Junction-1"]
            ... )
        """
        dss_file = Path(dss_file)
        output_folder = Path(output_folder)
        output_folder.mkdir(parents=True, exist_ok=True)

        created_files = []

        # Export flow results
        try:
            results = HmsDss.extract_hms_results(
                dss_file,
                element_names=element_names,
                result_type="flow",
                run_name=run_name,
            )

            for element, df in results.items():
                filename = f"flow_{element}.csv"
                filepath = output_folder / filename
                df.to_csv(filepath)
                created_files.append(str(filepath))
                logger.info(f"Exported: {filepath}")

        except Exception as e:
            logger.warning(f"Could not export flow results: {e}")

        # Export peak summary
        try:
            peaks = HmsResults.get_peak_flows(dss_file, run_name=run_name)
            if not peaks.empty:
                filepath = output_folder / "peak_flows_summary.csv"
                peaks.to_csv(filepath, index=False)
                created_files.append(str(filepath))

        except Exception as e:
            logger.warning(f"Could not export peak summary: {e}")

        # Export volume summary
        try:
            volumes = HmsResults.get_volume_summary(dss_file, run_name)
            if not volumes.empty:
                filepath = output_folder / "volume_summary.csv"
                volumes.to_csv(filepath, index=False)
                created_files.append(str(filepath))

        except Exception as e:
            logger.warning(f"Could not export volume summary: {e}")

        logger.info(f"Exported {len(created_files)} files to {output_folder}")
        return created_files

get_outflow_timeseries(dss_file, element_name, run_name=None) staticmethod

Get outflow time series for a specific element.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
element_name str

Name of the element (junction, outlet, etc.)

required
run_name Optional[str]

Optional run name filter

None

Returns:

Type Description
DataFrame

DataFrame with datetime index and flow values

Example

flow = HmsResults.get_outflow_timeseries("results.dss", "Outlet") print(flow.head())

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def get_outflow_timeseries(
    dss_file: Union[str, Path],
    element_name: str,
    run_name: Optional[str] = None
) -> pd.DataFrame:
    """
    Get outflow time series for a specific element.

    Args:
        dss_file: Path to the DSS file
        element_name: Name of the element (junction, outlet, etc.)
        run_name: Optional run name filter

    Returns:
        DataFrame with datetime index and flow values

    Example:
        >>> flow = HmsResults.get_outflow_timeseries("results.dss", "Outlet")
        >>> print(flow.head())
    """
    dss_file = Path(dss_file)

    # Get catalog and find matching flow path
    catalog = HmsDss.get_catalog(dss_file)

    matching_paths = select_result_paths(
        catalog,
        result_type="flow",
        element_names=[element_name],
        run_name=run_name,
        exclude_tables=True,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

    if not matching_paths:
        raise ValueError(f"No flow data found for element '{element_name}'")

    # Read the first matching path (or most recent if multiple)
    path = matching_paths[0]
    logger.info(f"Reading flow data from: {path}")

    df = HmsDss.read_timeseries(dss_file, path)
    df.columns = ['flow']
    return df

get_precipitation_timeseries(dss_file, element_name, run_name=None) staticmethod

Get precipitation time series for a specific element.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
element_name str

Name of the subbasin or gage

required
run_name Optional[str]

Optional run name filter

None

Returns:

Type Description
DataFrame

DataFrame with datetime index and precipitation values

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def get_precipitation_timeseries(
    dss_file: Union[str, Path],
    element_name: str,
    run_name: Optional[str] = None
) -> pd.DataFrame:
    """
    Get precipitation time series for a specific element.

    Args:
        dss_file: Path to the DSS file
        element_name: Name of the subbasin or gage
        run_name: Optional run name filter

    Returns:
        DataFrame with datetime index and precipitation values
    """
    dss_file = Path(dss_file)
    catalog = HmsDss.get_catalog(dss_file)

    matching_paths = select_result_paths(
        catalog,
        result_type="precipitation",
        element_names=[element_name],
        run_name=run_name,
        exclude_tables=True,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

    if not matching_paths:
        raise ValueError(f"No precipitation data found for element '{element_name}'")

    path = matching_paths[0]
    df = HmsDss.read_timeseries(dss_file, path)
    df.columns = ['precipitation']
    return df

get_peak_flows(dss_file, element_names=None, run_name=None, batch_size=50) staticmethod

Get peak flow summary for all elements in a DSS file.

Uses batched extraction for memory efficiency with large DSS files. Processing is done in batches to prevent memory exhaustion when extracting peaks from files with hundreds of elements.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
element_names Optional[List[str]]

Optional list of element names to include

None
run_name Optional[str]

Optional run name filter

None
batch_size int

Elements per batch (default: 50, increase for more RAM)

50

Returns:

Type Description
DataFrame

DataFrame with columns: - element: Element name - peak_flow: Peak flow value (cfs) - peak_time: Time of peak (datetime) - units: Engineering units - dss_path: Full DSS pathname

Example

from hms_commander import HmsResults peaks = HmsResults.get_peak_flows("results.dss") print(peaks.nlargest(10, 'peak_flow'))

Note

For very large DSS files (1000+ elements), you can adjust batch_size based on available memory. Lower values use less memory but take longer to process.

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def get_peak_flows(
    dss_file: Union[str, Path],
    element_names: Optional[List[str]] = None,
    run_name: Optional[str] = None,
    batch_size: int = 50
) -> pd.DataFrame:
    """
    Get peak flow summary for all elements in a DSS file.

    Uses batched extraction for memory efficiency with large DSS files.
    Processing is done in batches to prevent memory exhaustion when
    extracting peaks from files with hundreds of elements.

    Args:
        dss_file: Path to the DSS file
        element_names: Optional list of element names to include
        run_name: Optional run name filter
        batch_size: Elements per batch (default: 50, increase for more RAM)

    Returns:
        DataFrame with columns:
            - element: Element name
            - peak_flow: Peak flow value (cfs)
            - peak_time: Time of peak (datetime)
            - units: Engineering units
            - dss_path: Full DSS pathname

    Example:
        >>> from hms_commander import HmsResults
        >>> peaks = HmsResults.get_peak_flows("results.dss")
        >>> print(peaks.nlargest(10, 'peak_flow'))

    Note:
        For very large DSS files (1000+ elements), you can adjust
        batch_size based on available memory. Lower values use less
        memory but take longer to process.
    """
    dss_file = Path(dss_file)

    return HmsDss.get_peak_flows_batched(
        dss_file,
        element_names=element_names,
        run_name=run_name,
        batch_size=batch_size,
        progress=True
    )

get_volume_summary(dss_file, run_name=None, start_time=None, end_time=None) staticmethod

Get runoff volume summary for all elements.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
run_name Optional[str]

Optional run name filter

None
start_time Optional[datetime]

Optional start time for volume calculation

None
end_time Optional[datetime]

Optional end time for volume calculation

None

Returns:

Type Description
DataFrame

DataFrame with element names and total volumes

Example

volumes = HmsResults.get_volume_summary("results.dss") print(volumes)

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def get_volume_summary(
    dss_file: Union[str, Path],
    run_name: Optional[str] = None,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None
) -> pd.DataFrame:
    """
    Get runoff volume summary for all elements.

    Args:
        dss_file: Path to the DSS file
        run_name: Optional run name filter
        start_time: Optional start time for volume calculation
        end_time: Optional end time for volume calculation

    Returns:
        DataFrame with element names and total volumes

    Example:
        >>> volumes = HmsResults.get_volume_summary("results.dss")
        >>> print(volumes)
    """
    dss_file = Path(dss_file)
    flow_paths = HmsDss.list_flow_results(dss_file)

    flow_paths = select_result_paths(
        flow_paths,
        result_type="flow",
        run_name=run_name,
        exclude_tables=True,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

    records = []
    for path in flow_paths:
        try:
            parts = parse_pathname(path)
            df = HmsDss.read_timeseries(dss_file, path)

            if df.empty:
                continue

            # Filter by time window if specified
            if start_time:
                df = df[df.index >= start_time]
            if end_time:
                df = df[df.index <= end_time]

            if df.empty:
                continue

            # Calculate volume (trapezoidal integration)
            # Assuming flow in CFS and time in hours
            time_diff = df.index.to_series().diff().dt.total_seconds() / 3600  # hours
            flow_values = df.iloc[:, 0].values

            # Volume in acre-feet (CFS * hours * 0.0413)
            volume_af = np.trapz(flow_values, dx=time_diff.mean()) * time_diff.mean() * 0.0413

            records.append({
                'element': parts['element_name'],
                'total_volume_af': round(volume_af, 2),
                'mean_flow': round(df.iloc[:, 0].mean(), 2),
                'duration_hours': round((df.index[-1] - df.index[0]).total_seconds() / 3600, 2),
                'run_name': parts['run_name']
            })

        except Exception as e:
            logger.warning(f"Could not process {path}: {e}")

    df = pd.DataFrame(records)
    if not df.empty:
        df = df.sort_values('total_volume_af', ascending=False)

    return df

get_hydrograph_statistics(dss_file, element_name, run_name=None) staticmethod

Get comprehensive statistics for a flow hydrograph.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
element_name str

Name of the element

required
run_name Optional[str]

Optional run name filter

None

Returns:

Type Description
Dict[str, Any]

Dictionary with hydrograph statistics

Example

stats = HmsResults.get_hydrograph_statistics("results.dss", "Outlet") print(f"Peak: {stats['peak_flow']} cfs at {stats['peak_time']}")

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def get_hydrograph_statistics(
    dss_file: Union[str, Path],
    element_name: str,
    run_name: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get comprehensive statistics for a flow hydrograph.

    Args:
        dss_file: Path to the DSS file
        element_name: Name of the element
        run_name: Optional run name filter

    Returns:
        Dictionary with hydrograph statistics

    Example:
        >>> stats = HmsResults.get_hydrograph_statistics("results.dss", "Outlet")
        >>> print(f"Peak: {stats['peak_flow']} cfs at {stats['peak_time']}")
    """
    df = HmsResults.get_outflow_timeseries(dss_file, element_name, run_name)

    if df.empty:
        return {}

    flow = df['flow']

    stats = {
        'element': element_name,
        'start_time': df.index[0],
        'end_time': df.index[-1],
        'duration_hours': (df.index[-1] - df.index[0]).total_seconds() / 3600,

        # Flow statistics
        'peak_flow': flow.max(),
        'peak_time': flow.idxmax(),
        'min_flow': flow.min(),
        'mean_flow': flow.mean(),
        'median_flow': flow.median(),
        'std_flow': flow.std(),

        # Volume
        'total_volume_af': None,

        # Timing
        'time_to_peak_hours': None,
        'centroid_time': None,
    }

    # Calculate volume (acre-feet)
    time_diff_hours = df.index.to_series().diff().dt.total_seconds() / 3600
    stats['total_volume_af'] = round(
        np.trapz(flow.values, dx=time_diff_hours.mean()) * time_diff_hours.mean() * 0.0413,
        2
    )

    # Time to peak from start
    stats['time_to_peak_hours'] = round(
        (stats['peak_time'] - stats['start_time']).total_seconds() / 3600,
        2
    )

    # Centroid time (flow-weighted average time)
    try:
        time_numeric = (df.index - df.index[0]).total_seconds() / 3600
        stats['centroid_time'] = round(
            np.average(time_numeric, weights=flow.values),
            2
        )
    except Exception:
        pass

    return stats

compare_runs(dss_files, element_name, run_names=None) staticmethod

Compare flow results from multiple runs.

Parameters:

Name Type Description Default
dss_files Union[List[Union[str, Path]], Union[str, Path]]

List of DSS files or single file with multiple runs

required
element_name str

Name of the element to compare

required
run_names Optional[List[str]]

Optional list of run names to compare

None

Returns:

Type Description
DataFrame

DataFrame with time series from all runs for comparison

Example

comparison = HmsResults.compare_runs( ... ["run1.dss", "run2.dss", "run3.dss"], ... "Outlet" ... ) comparison.plot()

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def compare_runs(
    dss_files: Union[List[Union[str, Path]], Union[str, Path]],
    element_name: str,
    run_names: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Compare flow results from multiple runs.

    Args:
        dss_files: List of DSS files or single file with multiple runs
        element_name: Name of the element to compare
        run_names: Optional list of run names to compare

    Returns:
        DataFrame with time series from all runs for comparison

    Example:
        >>> comparison = HmsResults.compare_runs(
        ...     ["run1.dss", "run2.dss", "run3.dss"],
        ...     "Outlet"
        ... )
        >>> comparison.plot()
    """
    if isinstance(dss_files, (str, Path)):
        dss_files = [dss_files]

    all_series = {}

    for dss_file in dss_files:
        dss_file = Path(dss_file)

        try:
            # Get flow paths for this file
            flow_paths = select_result_paths(
                HmsDss.list_flow_results(dss_file),
                result_type="flow",
                element_names=[element_name],
                exclude_tables=True,
                result_patterns=HmsDss.HMS_RESULT_PATTERNS,
            )

            for path in flow_paths:
                parts = parse_pathname(path)

                if run_names and parts['run_name'] not in run_names:
                    continue

                # Create unique key
                key = f"{dss_file.stem}_{parts['run_name']}" if parts['run_name'] else dss_file.stem

                df = HmsDss.read_timeseries(dss_file, path)
                if not df.empty:
                    all_series[key] = df.iloc[:, 0]

        except Exception as e:
            logger.warning(f"Could not process {dss_file}: {e}")

    if not all_series:
        return pd.DataFrame()

    # Combine all series into a single DataFrame
    result = pd.DataFrame(all_series)
    logger.info(f"Compared {len(all_series)} runs for element '{element_name}'")

    return result

get_precipitation_summary(dss_file, run_name=None) staticmethod

Get precipitation summary for all subbasins.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
run_name Optional[str]

Optional run name filter

None

Returns:

Type Description
DataFrame

DataFrame with precipitation statistics by subbasin

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def get_precipitation_summary(
    dss_file: Union[str, Path],
    run_name: Optional[str] = None
) -> pd.DataFrame:
    """
    Get precipitation summary for all subbasins.

    Args:
        dss_file: Path to the DSS file
        run_name: Optional run name filter

    Returns:
        DataFrame with precipitation statistics by subbasin
    """
    dss_file = Path(dss_file)
    precip_paths = HmsDss.list_precipitation_data(dss_file)

    precip_paths = select_result_paths(
        precip_paths,
        result_type="precipitation",
        run_name=run_name,
        exclude_tables=True,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

    records = []
    for path in precip_paths:
        try:
            parts = parse_pathname(path)
            df = HmsDss.read_timeseries(dss_file, path)

            if df.empty:
                continue

            precip = df.iloc[:, 0]

            records.append({
                'element': parts['element_name'],
                'total_depth': round(precip.sum(), 2),
                'max_intensity': round(precip.max(), 2),
                'duration_hours': round((df.index[-1] - df.index[0]).total_seconds() / 3600, 2),
                'run_name': parts['run_name']
            })

        except Exception as e:
            logger.warning(f"Could not process {path}: {e}")

    return pd.DataFrame(records)

export_results_to_csv(dss_file, output_folder, element_names=None, run_name=None) staticmethod

Export results to CSV files for external analysis.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
output_folder Union[str, Path]

Folder for output CSV files

required
element_names Optional[List[str]]

Optional list of elements to export

None
run_name Optional[str]

Optional run name filter

None

Returns:

Type Description
List[str]

List of created CSV file paths

Example

files = HmsResults.export_results_to_csv( ... "results.dss", ... "csv_output", ... element_names=["Outlet", "Junction-1"] ... )

Source code in hms_commander/HmsResults.py
@staticmethod
@log_call
def export_results_to_csv(
    dss_file: Union[str, Path],
    output_folder: Union[str, Path],
    element_names: Optional[List[str]] = None,
    run_name: Optional[str] = None
) -> List[str]:
    """
    Export results to CSV files for external analysis.

    Args:
        dss_file: Path to the DSS file
        output_folder: Folder for output CSV files
        element_names: Optional list of elements to export
        run_name: Optional run name filter

    Returns:
        List of created CSV file paths

    Example:
        >>> files = HmsResults.export_results_to_csv(
        ...     "results.dss",
        ...     "csv_output",
        ...     element_names=["Outlet", "Junction-1"]
        ... )
    """
    dss_file = Path(dss_file)
    output_folder = Path(output_folder)
    output_folder.mkdir(parents=True, exist_ok=True)

    created_files = []

    # Export flow results
    try:
        results = HmsDss.extract_hms_results(
            dss_file,
            element_names=element_names,
            result_type="flow",
            run_name=run_name,
        )

        for element, df in results.items():
            filename = f"flow_{element}.csv"
            filepath = output_folder / filename
            df.to_csv(filepath)
            created_files.append(str(filepath))
            logger.info(f"Exported: {filepath}")

    except Exception as e:
        logger.warning(f"Could not export flow results: {e}")

    # Export peak summary
    try:
        peaks = HmsResults.get_peak_flows(dss_file, run_name=run_name)
        if not peaks.empty:
            filepath = output_folder / "peak_flows_summary.csv"
            peaks.to_csv(filepath, index=False)
            created_files.append(str(filepath))

    except Exception as e:
        logger.warning(f"Could not export peak summary: {e}")

    # Export volume summary
    try:
        volumes = HmsResults.get_volume_summary(dss_file, run_name)
        if not volumes.empty:
            filepath = output_folder / "volume_summary.csv"
            volumes.to_csv(filepath, index=False)
            created_files.append(str(filepath))

    except Exception as e:
        logger.warning(f"Could not export volume summary: {e}")

    logger.info(f"Exported {len(created_files)} files to {output_folder}")
    return created_files
CLB Engineering Corporation  ·  LLM Forward Engineering
HMS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.