Skip to content

HmsDss

DSS file operations for HEC-HMS results.

hms_commander.HmsDss

DSS file operations for HMS input/output.

Uses hms-commander's standalone DSS implementation powered by HEC Monolith libraries. Provides HMS-specific convenience methods for working with precipitation, discharge, and other hydrologic time series.

All methods are static - no instantiation required.

Dependencies
  • pyjnius (pip install pyjnius)
  • Java 8+ (JRE or JDK)
  • HEC Monolith libraries (auto-downloaded on first use)
Example

from hms_commander import HmsDss catalog = HmsDss.get_catalog("results.dss") ts = HmsDss.read_timeseries("results.dss", "/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/") print(f"Units: {ts.attrs['units']}")

Source code in hms_commander/dss/hms_dss.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
class HmsDss:
    """
    DSS file operations for HMS input/output.

    Uses hms-commander's standalone DSS implementation powered by HEC Monolith
    libraries. Provides HMS-specific convenience methods for working with
    precipitation, discharge, and other hydrologic time series.

    All methods are static - no instantiation required.

    Dependencies:
        - pyjnius (pip install pyjnius)
        - Java 8+ (JRE or JDK)
        - HEC Monolith libraries (auto-downloaded on first use)

    Example:
        >>> from hms_commander import HmsDss
        >>> catalog = HmsDss.get_catalog("results.dss")
        >>> ts = HmsDss.read_timeseries("results.dss", "/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/")
        >>> print(f"Units: {ts.attrs['units']}")
    """

    # Common HMS DSS path patterns (C-part matching)
    # HMS uses formats like FLOW, FLOW-OBSERVED, FLOW-DIRECT, FLOW-BASE, etc.
    HMS_RESULT_PATTERNS = {
        'flow': r'/FLOW[^/]*/|/FLOW/',             # Matches FLOW, FLOW-DIRECT, FLOW-COMBINE, etc.
        'flow-total': r'/FLOW/',                    # Only total FLOW
        'flow-observed': r'/FLOW-OBSERVED/',
        'flow-direct': r'/FLOW-DIRECT/',
        'flow-base': r'/FLOW-BASE/',
        'flow-combine': r'/FLOW-COMBINE/',
        'precipitation': r'/PRECIP[^/]*/|/PRECIP/', # Matches PRECIP, PRECIP-INC, PRECIP-CUM
        'precip-inc': r'/PRECIP-INC/',
        'precip-cum': r'/PRECIP-CUM/',
        'precip-excess': r'/PRECIP-EXCESS/',
        'precip-loss': r'/PRECIP-LOSS/',
        'stage': r'/STAGE/',
        'storage': r'/STORAGE[^/]*/|/STORAGE/',
        'storage-gw': r'/STORAGE-GW/',
        'storage-soil': r'/STORAGE-SOIL/',
        'elevation': r'/ELEV/',
        'outflow': r'/OUTFLOW[^/]*/|/OUTFLOW/',
        'inflow': r'/INFLOW[^/]*/|/INFLOW/',
        'excess': r'/EXCESS[^/]*/|/EXCESS/',
        'baseflow': r'/BASEFLOW/',
        'infiltration': r'/INFILTRATION/',
        'et': r'/ET[^/]*/|/ET/',
    }

    @staticmethod
    def is_available() -> bool:
        """
        Check if full DSS functionality is available.

        Returns:
            True if pyjnius can be imported and DSS operations will work

        Example:
            >>> if HmsDss.is_available():
            ...     catalog = HmsDss.get_catalog("file.dss")
            ... else:
            ...     print("Install pyjnius: pip install pyjnius")
        """
        if not DSS_AVAILABLE:
            return False
        return DssCore.is_available()

    @staticmethod
    @log_call
    def get_catalog(
        dss_file: Union[str, Path]
    ) -> List[str]:
        """
        Get catalog of all paths in a DSS file.

        Args:
            dss_file: Path to the DSS file

        Returns:
            List of DSS pathnames

        Example:
            >>> paths = HmsDss.get_catalog("results.dss")
            >>> for path in paths:
            ...     print(path)
        """
        dss_file = Path(dss_file)

        if not dss_file.exists():
            raise FileNotFoundError(f"DSS file not found: {dss_file}")

        if not DSS_AVAILABLE:
            raise ImportError(
                "DSS functionality requires pyjnius.\n"
                "Install with: pip install pyjnius\n"
                "Also requires Java 8+ (JRE or JDK)"
            )

        return DssCore.get_catalog(dss_file)

    @staticmethod
    @log_call
    def read_timeseries(
        dss_file: Union[str, Path],
        pathname: str
    ) -> pd.DataFrame:
        """
        Read a time series from a DSS file.

        Args:
            dss_file: Path to the DSS file
            pathname: DSS pathname to read

        Returns:
            DataFrame with:
            - DatetimeIndex for time series operations
            - 'datetime' column for plotting (same as index)
            - 'value' column with time series data
            - Metadata via df.attrs: pathname, units, type, interval

        Example:
            >>> ts = HmsDss.read_timeseries(
            ...     "results.dss",
            ...     "/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/"
            ... )
            >>> print(ts.head())
            >>> print(f"Units: {ts.attrs['units']}")
            >>>
            >>> # Plotting options:
            >>> ax.plot(ts['datetime'], ts['value'])    # Using datetime column
            >>> ax.plot(ts.index, ts['value'])          # Using DatetimeIndex
            >>> ts.plot(y='value')                      # Pandas automatic
        """
        dss_file = Path(dss_file)

        if not dss_file.exists():
            raise FileNotFoundError(f"DSS file not found: {dss_file}")

        if not DSS_AVAILABLE:
            raise ImportError(
                "DSS functionality requires pyjnius.\n"
                "Install with: pip install pyjnius"
            )

        return DssCore.read_timeseries(dss_file, pathname)

    @staticmethod
    @log_call
    def read_multiple_timeseries(
        dss_file: Union[str, Path],
        pathnames: List[str]
    ) -> Dict[str, pd.DataFrame]:
        """
        Read multiple time series from a DSS file.

        Args:
            dss_file: Path to the DSS file
            pathnames: List of DSS pathnames to read

        Returns:
            Dictionary mapping pathnames to DataFrames (None on failure)

        Example:
            >>> paths = ["/BASIN/SUB1/FLOW//15MIN/RUN1/", "/BASIN/SUB2/FLOW//15MIN/RUN1/"]
            >>> data = HmsDss.read_multiple_timeseries("results.dss", paths)
            >>> for path, df in data.items():
            ...     if df is not None:
            ...         print(f"{path}: {len(df)} points")
        """
        dss_file = Path(dss_file)

        if not DSS_AVAILABLE:
            raise ImportError("DSS functionality requires pyjnius.")

        return DssCore.read_multiple_timeseries(dss_file, pathnames)

    @staticmethod
    @log_call
    def extract_hms_results(
        dss_file: Union[str, Path],
        element_names: Optional[List[str]] = None,
        result_type: str = "flow",
        run_name: Optional[str] = None,
    ) -> Dict[str, pd.DataFrame]:
        """
        Extract HMS simulation results from a DSS file.

        This is a convenience method that filters results by element name
        and result type.

        Args:
            dss_file: Path to the DSS file
            element_names: List of element names to extract (all if None)
            result_type: Type of result ("flow", "precipitation", "stage", etc.)
            run_name: Optional F-part run name filter

        Returns:
            Dictionary mapping element names to result DataFrames

        Example:
            >>> results = HmsDss.extract_hms_results(
            ...     "results.dss",
            ...     element_names=["Outlet", "Junction-1"],
            ...     result_type="flow"
            ... )
            >>> for name, df in results.items():
            ...     print(f"{name}: peak = {df['value'].max():.2f}")
        """
        dss_file = Path(dss_file)

        if not DSS_AVAILABLE:
            raise ImportError("DSS functionality requires pyjnius.")

        catalog = HmsDss.get_catalog(dss_file)
        matching_paths = select_result_paths(
            catalog,
            result_type=result_type,
            element_names=element_names,
            run_name=run_name,
            exclude_tables=True,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

        # Read matching time series
        results = {}
        for path in matching_paths:
            parts = parse_pathname(path)
            element_name = parts["element_name"]
            try:
                df = HmsDss.read_timeseries(dss_file, path)
                results[element_name] = df
            except Exception as e:
                logger.warning(f"Could not read {path}: {e}")

        logger.info(f"Extracted {len(results)} result time series")
        return results

    @staticmethod
    @log_call
    def get_info(
        dss_file: Union[str, Path]
    ) -> Dict[str, Any]:
        """
        Get summary information about a DSS file.

        Args:
            dss_file: Path to the DSS file

        Returns:
            Dictionary with file information including path count and data types

        Example:
            >>> info = HmsDss.get_info("results.dss")
            >>> print(f"Total paths: {info['num_paths']}")
            >>> print(f"Data types: {info['path_types']}")
        """
        dss_file = Path(dss_file)

        info = {
            'file_path': str(dss_file),
            'file_exists': dss_file.exists(),
            'file_size_mb': None,
            'num_paths': None,
            'path_types': {},
            'dss_available': DSS_AVAILABLE
        }

        if dss_file.exists():
            info['file_size_mb'] = round(dss_file.stat().st_size / (1024 * 1024), 2)

            if DSS_AVAILABLE:
                try:
                    catalog = HmsDss.get_catalog(dss_file)
                    info['num_paths'] = len(catalog)

                    # Categorize paths by C-part
                    for path in catalog:
                        parts = path.split('/')
                        if len(parts) >= 4:
                            data_type = parts[3]  # C part
                            info['path_types'][data_type] = info['path_types'].get(data_type, 0) + 1

                except Exception as e:
                    logger.warning(f"Could not read DSS catalog: {e}")

        return info

    @staticmethod
    @log_call
    def list_flow_results(
        dss_file: Union[str, Path]
    ) -> List[str]:
        """
        List all flow result pathnames in a DSS file.

        Args:
            dss_file: Path to the DSS file

        Returns:
            List of flow result pathnames

        Example:
            >>> flows = HmsDss.list_flow_results("results.dss")
            >>> for path in flows:
            ...     parts = HmsDss.parse_dss_pathname(path)
            ...     print(f"{parts['element_name']}: {path}")
        """
        catalog = HmsDss.get_catalog(dss_file)
        return select_result_paths(
            catalog,
            result_type="flow",
            exclude_tables=False,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

    @staticmethod
    @log_call
    def list_precipitation_data(
        dss_file: Union[str, Path]
    ) -> List[str]:
        """
        List all precipitation data pathnames in a DSS file.

        Args:
            dss_file: Path to the DSS file

        Returns:
            List of precipitation pathnames
        """
        catalog = HmsDss.get_catalog(dss_file)
        return select_result_paths(
            catalog,
            result_type="precipitation",
            exclude_tables=False,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

    @staticmethod
    @log_call
    def list_stage_results(
        dss_file: Union[str, Path]
    ) -> List[str]:
        """
        List all stage result pathnames in a DSS file.

        Args:
            dss_file: Path to the DSS file

        Returns:
            List of stage result pathnames
        """
        catalog = HmsDss.get_catalog(dss_file)
        return select_result_paths(
            catalog,
            result_type="stage",
            exclude_tables=False,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

    @staticmethod
    @log_call
    def list_storage_results(
        dss_file: Union[str, Path]
    ) -> List[str]:
        """
        List all storage result pathnames in a DSS file.

        Args:
            dss_file: Path to the DSS file

        Returns:
            List of storage result pathnames
        """
        catalog = HmsDss.get_catalog(dss_file)
        return select_result_paths(
            catalog,
            result_type="storage",
            exclude_tables=False,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

    @staticmethod
    @log_call
    def parse_dss_pathname(pathname: str) -> Dict[str, str]:
        """
        Parse a DSS pathname into its component parts.

        DSS pathnames have format: /A/B/C/D/E/F/
        - A: Basin/Project identifier
        - B: Location/Element name
        - C: Data type (FLOW, PRECIP, etc.)
        - D: Date/Time block
        - E: Time interval
        - F: Version/Run identifier

        Args:
            pathname: DSS pathname string

        Returns:
            Dictionary with pathname components including convenience fields

        Example:
            >>> parts = HmsDss.parse_dss_pathname("/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/")
            >>> print(parts['element_name'])  # 'OUTLET'
            >>> print(parts['data_type'])     # 'FLOW'
            >>> print(parts['run_name'])      # 'RUN1'
        """
        return parse_pathname(pathname)

    @staticmethod
    @log_call
    def create_dss_pathname(
        basin: str,
        element: str,
        data_type: str,
        interval: str,
        run_name: str = "",
        date_block: str = ""
    ) -> str:
        """
        Create a DSS pathname from components.

        Args:
            basin: Basin/Project name (A part)
            element: Element name (B part)
            data_type: Data type like FLOW, PRECIP (C part)
            interval: Time interval like 15MIN, 1HOUR (E part)
            run_name: Run identifier (F part)
            date_block: Date block (D part, usually empty)

        Returns:
            Formatted DSS pathname

        Example:
            >>> path = HmsDss.create_dss_pathname(
            ...     "MYBASIN", "OUTLET", "FLOW", "15MIN", "RUN1"
            ... )
            >>> print(path)  # '/MYBASIN/OUTLET/FLOW//15MIN/RUN:RUN1/'
        """
        return create_pathname(basin, element, data_type, interval, run_name, date_block)

    @staticmethod
    @log_call
    def filter_catalog(
        catalog: List[str],
        pattern: Optional[str] = None,
        data_type: Optional[str] = None,
        element: Optional[str] = None
    ) -> List[str]:
        """
        Filter DSS catalog by pattern or components.

        Args:
            catalog: List of DSS pathnames
            pattern: Regex pattern to match against full pathname
            data_type: Filter by C-part (e.g., "FLOW", "PRECIP")
            element: Filter by B-part (element/location name)

        Returns:
            Filtered list of pathnames

        Example:
            >>> paths = HmsDss.get_catalog("file.dss")
            >>> flow_paths = HmsDss.filter_catalog(paths, data_type="FLOW")
            >>> outlet_flows = HmsDss.filter_catalog(flow_paths, element="OUTLET")
        """
        return filter_catalog(catalog, pattern, data_type, element)

    @staticmethod
    @log_call
    def get_peak_flows(
        dss_file: Union[str, Path],
        element_names: Optional[List[str]] = None
    ) -> pd.DataFrame:
        """
        Extract peak flow values for all elements.

        Note:
            For large DSS files with many elements (100+), consider using
            get_peak_flows_batched() instead to avoid memory issues. That method
            processes elements in batches with garbage collection between batches.

        Args:
            dss_file: Path to the DSS file
            element_names: Optional list of element names to filter

        Returns:
            DataFrame with columns: element, peak_flow, peak_time, units

        Example:
            >>> peaks = HmsDss.get_peak_flows("results.dss")
            >>> print(peaks.sort_values('peak_flow', ascending=False))

        See Also:
            get_peak_flows_batched: Memory-efficient version for large files
        """
        results = HmsDss.extract_hms_results(dss_file, element_names, result_type="flow")

        records = []
        for element_name, df in results.items():
            if df is not None and not df.empty:
                peak_idx = df['value'].idxmax()
                records.append({
                    'element': element_name,
                    'peak_flow': df.loc[peak_idx, 'value'],
                    'peak_time': peak_idx,
                    'units': df.attrs.get('units', '')
                })

        return pd.DataFrame(records)

    @staticmethod
    @log_call
    def get_peak_flows_batched(
        dss_file: Union[str, Path],
        element_names: Optional[List[str]] = None,
        run_name: Optional[str] = None,
        batch_size: int = 50,
        progress: bool = True
    ) -> pd.DataFrame:
        """
        Extract peak flows in batches to avoid memory issues.

        Processes N elements at a time, garbage collects between batches,
        preventing memory exhaustion for large DSS files.

        Args:
            dss_file: Path to DSS file
            element_names: Optional filter for specific elements
            run_name: Optional filter for specific run (e.g., "1%(100YR)RUN")
            batch_size: Elements to process per batch (default: 50)
            progress: Show progress logging (default: True)

        Returns:
            DataFrame with columns:
                - element: Element name
                - peak_flow: Peak flow value (cfs)
                - peak_time: Time of peak (datetime)
                - units: Engineering units
                - dss_path: Full DSS pathname

        Memory: O(batch_size) instead of O(total_elements)

        Example:
            >>> # Safe for 1000+ elements
            >>> peaks = HmsDss.get_peak_flows_batched(
            ...     "results.dss",
            ...     run_name="1%(100YR)RUN",
            ...     batch_size=100  # Adjust for available memory
            ... )
        """
        dss_file = Path(dss_file)

        if not dss_file.exists():
            raise FileNotFoundError(f"DSS file not found: {dss_file}")

        if not DSS_AVAILABLE:
            raise ImportError(
                "DSS functionality requires pyjnius.\n"
                "Install with: pip install pyjnius\n"
                "Also requires Java 8+ (JRE or JDK)"
            )

        catalog = HmsDss.get_catalog(dss_file)
        flow_paths = select_result_paths(
            catalog,
            result_type="flow-total",
            element_names=element_names,
            run_name=run_name,
            exclude_tables=True,
            result_patterns=HmsDss.HMS_RESULT_PATTERNS,
        )

        total_paths = len(flow_paths)
        if total_paths == 0:
            logger.info("No flow paths found in DSS file")
            return pd.DataFrame(columns=['element', 'peak_flow', 'peak_time', 'units', 'dss_path'])

        if progress:
            logger.info(f"Extracting peaks from {total_paths} paths...")

        records = []
        total_batches = (total_paths + batch_size - 1) // batch_size  # Ceiling division

        for i in range(0, total_paths, batch_size):
            batch_num = i // batch_size + 1
            batch_paths = flow_paths[i:i + batch_size]

            if progress:
                logger.info(f"Batch {batch_num}/{total_batches}: processing {len(batch_paths)} paths...")

            # Process each path in the batch
            for path in batch_paths:
                try:
                    parts = HmsDss.parse_dss_pathname(path)

                    # Use peak-only extraction (350x more memory efficient)
                    peak_info = DssCore.get_peak_value(dss_file, path)

                    if peak_info is not None:
                        records.append({
                            'element': parts['element_name'],
                            'peak_flow': peak_info['peak_flow'],
                            'peak_time': peak_info['peak_time'],
                            'units': peak_info['units'],
                            'dss_path': path
                        })

                except Exception as e:
                    logger.warning(f"Could not read {path}: {e}")

            # Garbage collect after each batch to free memory
            gc.collect()

        # Create DataFrame from records
        result_df = pd.DataFrame(records)

        # Sort by peak_flow descending
        if not result_df.empty:
            result_df = result_df.sort_values('peak_flow', ascending=False).reset_index(drop=True)

        if progress:
            logger.info(f"Extracted peak flows for {len(result_df)} elements")
        return result_df

    @staticmethod
    @log_call
    def get_total_precipitation(
        dss_file: Union[str, Path],
        element_names: Optional[List[str]] = None
    ) -> pd.DataFrame:
        """
        Calculate total precipitation for all elements.

        Args:
            dss_file: Path to the DSS file
            element_names: Optional list of element names to filter

        Returns:
            DataFrame with columns: element, total_precip, units, num_intervals

        Example:
            >>> precip = HmsDss.get_total_precipitation("results.dss")
            >>> print(precip)
        """
        results = HmsDss.extract_hms_results(
            dss_file, element_names, result_type="precipitation"
        )

        records = []
        for element_name, df in results.items():
            if df is not None and not df.empty:
                records.append({
                    'element': element_name,
                    'total_precip': df['value'].sum(),
                    'units': df.attrs.get('units', ''),
                    'num_intervals': len(df)
                })

        return pd.DataFrame(records)

    @staticmethod
    @log_call
    def write_paired_data(
        dss_file: Union[str, Path],
        pathname: str,
        x_values,
        y_values,
        x_units: str = "HOURS",
        y_units: str = "FRACTION",
        x_label: str = "TIME",
        y_label: str = "CUMULATIVE"
    ) -> bool:
        """
        Write paired data (X-Y curve) to DSS file.

        This is used for Atlas 14 temporal distributions, rating curves,
        and other X-Y relationships.

        Args:
            dss_file: Path to DSS file (created if doesn't exist)
            pathname: DSS pathname (e.g., "//TX_R3/FIRST-QUARTILE/24HR///50%/")
            x_values: X coordinates (e.g., time in hours) - numpy array or list
            y_values: Y coordinates (e.g., cumulative fraction 0-1) - numpy array or list
            x_units: Units for X values (default: "HOURS")
            y_units: Units for Y values (default: "FRACTION")
            x_label: Label for X axis (default: "TIME")
            y_label: Label for Y axis (default: "CUMULATIVE")

        Returns:
            True if write succeeded, False otherwise

        Example:
            >>> import numpy as np
            >>> x = np.linspace(0, 24, 49)  # 0 to 24 hours
            >>> y = np.linspace(0, 1, 49)   # 0 to 100% cumulative
            >>> HmsDss.write_paired_data(
            ...     "temporal.dss",
            ...     "//TX_R3/ALL-CASES/24HR///50%/",
            ...     x, y
            ... )
        """
        import numpy as np
        dss_file = Path(dss_file)

        if not DSS_AVAILABLE:
            raise ImportError(
                "DSS functionality requires pyjnius.\n"
                "Install with: pip install pyjnius\n"
                "Also requires Java 8+ (JRE or JDK)"
            )

        # Convert to numpy arrays if needed
        x_values = np.asarray(x_values)
        y_values = np.asarray(y_values)

        return DssCore.write_paired_data(
            dss_file, pathname, x_values, y_values,
            x_units, y_units, x_label, y_label
        )

    @staticmethod
    @log_call
    def write_multiple_paired_data(
        dss_file: Union[str, Path],
        paired_data_records: List[Dict]
    ) -> Dict[str, bool]:
        """
        Write multiple paired data records to DSS file.

        More efficient than calling write_paired_data repeatedly as it
        keeps the DSS file open for all writes.

        Args:
            dss_file: Path to DSS file
            paired_data_records: List of dicts with keys:
                - pathname: DSS pathname
                - x_values: numpy array of X values
                - y_values: numpy array of Y values
                - x_units: (optional) X units
                - y_units: (optional) Y units
                - x_label: (optional) X label
                - y_label: (optional) Y label

        Returns:
            Dict mapping pathname to success status (True/False)

        Example:
            >>> records = [
            ...     {"pathname": "//TX_R3/FIRST-QUARTILE/24HR///50%/",
            ...      "x_values": hours, "y_values": fractions_50},
            ...     {"pathname": "//TX_R3/FIRST-QUARTILE/24HR///90%/",
            ...      "x_values": hours, "y_values": fractions_90},
            ... ]
            >>> results = HmsDss.write_multiple_paired_data("temporal.dss", records)
            >>> print(f"Wrote {sum(results.values())} records")
        """
        import numpy as np
        dss_file = Path(dss_file)

        if not DSS_AVAILABLE:
            raise ImportError(
                "DSS functionality requires pyjnius.\n"
                "Install with: pip install pyjnius"
            )

        # Convert values to numpy arrays if needed
        for record in paired_data_records:
            record['x_values'] = np.asarray(record['x_values'])
            record['y_values'] = np.asarray(record['y_values'])

        return DssCore.write_multiple_paired_data(dss_file, paired_data_records)

    @staticmethod
    @log_call
    def import_atlas14_temporal(
        dss_file: Union[str, Path],
        temporal_distributions: Dict[str, 'pd.DataFrame'],
        region_code: str,
        duration_hours: int = 24
    ) -> Dict[str, bool]:
        """
        Import Atlas 14 temporal distributions to DSS file.

        Convenience method specifically for Atlas 14 temporal data.
        Creates proper DSS pathnames and writes all quartiles and probabilities.

        Args:
            dss_file: Output DSS file path
            temporal_distributions: Dict mapping quartile names to DataFrames
                Each DataFrame should have:
                - Index: hours (0, 0.5, 1.0, ... 24.0)
                - Columns: probability strings ("90%", "80%", ..., "10%")
                - Values: cumulative percentages (0-100)
            region_code: Atlas 14 region code (e.g., "TX_R3")
            duration_hours: Storm duration in hours (default: 24)

        Returns:
            Dict mapping pathname to success status

        Example:
            >>> # Parse temporal CSV
            >>> temporal = parse_temporal_csv(csv_content)
            >>>
            >>> # Import to DSS
            >>> results = HmsDss.import_atlas14_temporal(
            ...     "TX_R3_24H.dss",
            ...     temporal,
            ...     region_code="TX_R3",
            ...     duration_hours=24
            ... )
            >>> print(f"Imported {sum(results.values())}/45 distributions")
        """
        import numpy as np

        dss_file = Path(dss_file)

        if not DSS_AVAILABLE:
            raise ImportError("DSS functionality requires pyjnius.")

        # Map quartile names to DSS-compatible names
        quartile_dss_names = {
            "First Quartile": "FIRST-QUARTILE",
            "Second Quartile": "SECOND-QUARTILE",
            "Third Quartile": "THIRD-QUARTILE",
            "Fourth Quartile": "FOURTH-QUARTILE",
            "All Cases": "ALL-CASES"
        }

        duration_str = f"{duration_hours}HR"
        records = []

        for quartile_name, df in temporal_distributions.items():
            dss_quartile = quartile_dss_names.get(quartile_name)
            if dss_quartile is None:
                logger.warning(f"Unknown quartile: {quartile_name}, skipping")
                continue

            x_values = np.array(df.index)  # Hours

            for prob_col in df.columns:
                # Generate pathname
                pathname = f"//{region_code}/{dss_quartile}/{duration_str}///{prob_col}/"

                # Convert percentage to fraction
                y_values = df[prob_col].values / 100.0

                records.append({
                    'pathname': pathname,
                    'x_values': x_values,
                    'y_values': y_values,
                    'x_units': 'HOURS',
                    'y_units': 'FRACTION',
                    'x_label': 'TIME',
                    'y_label': 'CUMULATIVE'
                })

        logger.info(f"Importing {len(records)} temporal distributions to {dss_file}")
        return DssCore.write_multiple_paired_data(dss_file, records)

    @staticmethod
    @log_call
    def read_paired_data(
        dss_file: Union[str, Path],
        pathname: str
    ) -> Optional[Dict[str, Any]]:
        """
        Read paired data (X-Y curve) from DSS file.

        This is used for reading rating curves, diversion tables,
        storage-flow relationships, and other X-Y paired data.

        Args:
            dss_file: Path to DSS file
            pathname: DSS pathname for paired data (e.g., "//ELEMENT/FLOW-DIVERSION///TABLE/")

        Returns:
            Dictionary with:
                - x_values: numpy array of X values
                - y_values: numpy array of Y values
                - x_units: X axis units
                - y_units: Y axis units
                - x_label: X axis label
                - y_label: Y axis label
                - pathname: Original pathname
            Returns None if read fails

        Example:
            >>> data = HmsDss.read_paired_data(
            ...     "paired_data.dss",
            ...     "//T1011300_0000_D/FLOW-DIVERSION///TABLE/"
            ... )
            >>> print(f"Inflow values: {data['x_values']}")
            >>> print(f"Diversion values: {data['y_values']}")
        """
        dss_file = Path(dss_file)

        if not dss_file.exists():
            raise FileNotFoundError(f"DSS file not found: {dss_file}")

        if not DSS_AVAILABLE:
            raise ImportError(
                "DSS functionality requires pyjnius.\n"
                "Install with: pip install pyjnius\n"
                "Also requires Java 8+ (JRE or JDK)"
            )

        return DssCore.read_paired_data(dss_file, pathname)

is_available() staticmethod

Check if full DSS functionality is available.

Returns:

Type Description
bool

True if pyjnius can be imported and DSS operations will work

Example

if HmsDss.is_available(): ... catalog = HmsDss.get_catalog("file.dss") ... else: ... print("Install pyjnius: pip install pyjnius")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
def is_available() -> bool:
    """
    Check if full DSS functionality is available.

    Returns:
        True if pyjnius can be imported and DSS operations will work

    Example:
        >>> if HmsDss.is_available():
        ...     catalog = HmsDss.get_catalog("file.dss")
        ... else:
        ...     print("Install pyjnius: pip install pyjnius")
    """
    if not DSS_AVAILABLE:
        return False
    return DssCore.is_available()

get_catalog(dss_file) staticmethod

Get catalog of all paths in a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required

Returns:

Type Description
List[str]

List of DSS pathnames

Example

paths = HmsDss.get_catalog("results.dss") for path in paths: ... print(path)

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def get_catalog(
    dss_file: Union[str, Path]
) -> List[str]:
    """
    Get catalog of all paths in a DSS file.

    Args:
        dss_file: Path to the DSS file

    Returns:
        List of DSS pathnames

    Example:
        >>> paths = HmsDss.get_catalog("results.dss")
        >>> for path in paths:
        ...     print(path)
    """
    dss_file = Path(dss_file)

    if not dss_file.exists():
        raise FileNotFoundError(f"DSS file not found: {dss_file}")

    if not DSS_AVAILABLE:
        raise ImportError(
            "DSS functionality requires pyjnius.\n"
            "Install with: pip install pyjnius\n"
            "Also requires Java 8+ (JRE or JDK)"
        )

    return DssCore.get_catalog(dss_file)

read_timeseries(dss_file, pathname) staticmethod

Read a time series from a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
pathname str

DSS pathname to read

required

Returns:

Type Description
DataFrame

DataFrame with:

DataFrame
  • DatetimeIndex for time series operations
DataFrame
  • 'datetime' column for plotting (same as index)
DataFrame
  • 'value' column with time series data
DataFrame
  • Metadata via df.attrs: pathname, units, type, interval
Example

ts = HmsDss.read_timeseries( ... "results.dss", ... "/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/" ... ) print(ts.head()) print(f"Units: {ts.attrs['units']}")

Plotting options:

ax.plot(ts['datetime'], ts['value']) # Using datetime column ax.plot(ts.index, ts['value']) # Using DatetimeIndex ts.plot(y='value') # Pandas automatic

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def read_timeseries(
    dss_file: Union[str, Path],
    pathname: str
) -> pd.DataFrame:
    """
    Read a time series from a DSS file.

    Args:
        dss_file: Path to the DSS file
        pathname: DSS pathname to read

    Returns:
        DataFrame with:
        - DatetimeIndex for time series operations
        - 'datetime' column for plotting (same as index)
        - 'value' column with time series data
        - Metadata via df.attrs: pathname, units, type, interval

    Example:
        >>> ts = HmsDss.read_timeseries(
        ...     "results.dss",
        ...     "/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/"
        ... )
        >>> print(ts.head())
        >>> print(f"Units: {ts.attrs['units']}")
        >>>
        >>> # Plotting options:
        >>> ax.plot(ts['datetime'], ts['value'])    # Using datetime column
        >>> ax.plot(ts.index, ts['value'])          # Using DatetimeIndex
        >>> ts.plot(y='value')                      # Pandas automatic
    """
    dss_file = Path(dss_file)

    if not dss_file.exists():
        raise FileNotFoundError(f"DSS file not found: {dss_file}")

    if not DSS_AVAILABLE:
        raise ImportError(
            "DSS functionality requires pyjnius.\n"
            "Install with: pip install pyjnius"
        )

    return DssCore.read_timeseries(dss_file, pathname)

read_multiple_timeseries(dss_file, pathnames) staticmethod

Read multiple time series from a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
pathnames List[str]

List of DSS pathnames to read

required

Returns:

Type Description
Dict[str, DataFrame]

Dictionary mapping pathnames to DataFrames (None on failure)

Example

paths = ["/BASIN/SUB1/FLOW//15MIN/RUN1/", "/BASIN/SUB2/FLOW//15MIN/RUN1/"] data = HmsDss.read_multiple_timeseries("results.dss", paths) for path, df in data.items(): ... if df is not None: ... print(f"{path}: {len(df)} points")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def read_multiple_timeseries(
    dss_file: Union[str, Path],
    pathnames: List[str]
) -> Dict[str, pd.DataFrame]:
    """
    Read multiple time series from a DSS file.

    Args:
        dss_file: Path to the DSS file
        pathnames: List of DSS pathnames to read

    Returns:
        Dictionary mapping pathnames to DataFrames (None on failure)

    Example:
        >>> paths = ["/BASIN/SUB1/FLOW//15MIN/RUN1/", "/BASIN/SUB2/FLOW//15MIN/RUN1/"]
        >>> data = HmsDss.read_multiple_timeseries("results.dss", paths)
        >>> for path, df in data.items():
        ...     if df is not None:
        ...         print(f"{path}: {len(df)} points")
    """
    dss_file = Path(dss_file)

    if not DSS_AVAILABLE:
        raise ImportError("DSS functionality requires pyjnius.")

    return DssCore.read_multiple_timeseries(dss_file, pathnames)

extract_hms_results(dss_file, element_names=None, result_type='flow', run_name=None) staticmethod

Extract HMS simulation results from a DSS file.

This is a convenience method that filters results by element name and result type.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
element_names Optional[List[str]]

List of element names to extract (all if None)

None
result_type str

Type of result ("flow", "precipitation", "stage", etc.)

'flow'
run_name Optional[str]

Optional F-part run name filter

None

Returns:

Type Description
Dict[str, DataFrame]

Dictionary mapping element names to result DataFrames

Example

results = HmsDss.extract_hms_results( ... "results.dss", ... element_names=["Outlet", "Junction-1"], ... result_type="flow" ... ) for name, df in results.items(): ... print(f"{name}: peak = {df['value'].max():.2f}")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def extract_hms_results(
    dss_file: Union[str, Path],
    element_names: Optional[List[str]] = None,
    result_type: str = "flow",
    run_name: Optional[str] = None,
) -> Dict[str, pd.DataFrame]:
    """
    Extract HMS simulation results from a DSS file.

    This is a convenience method that filters results by element name
    and result type.

    Args:
        dss_file: Path to the DSS file
        element_names: List of element names to extract (all if None)
        result_type: Type of result ("flow", "precipitation", "stage", etc.)
        run_name: Optional F-part run name filter

    Returns:
        Dictionary mapping element names to result DataFrames

    Example:
        >>> results = HmsDss.extract_hms_results(
        ...     "results.dss",
        ...     element_names=["Outlet", "Junction-1"],
        ...     result_type="flow"
        ... )
        >>> for name, df in results.items():
        ...     print(f"{name}: peak = {df['value'].max():.2f}")
    """
    dss_file = Path(dss_file)

    if not DSS_AVAILABLE:
        raise ImportError("DSS functionality requires pyjnius.")

    catalog = HmsDss.get_catalog(dss_file)
    matching_paths = select_result_paths(
        catalog,
        result_type=result_type,
        element_names=element_names,
        run_name=run_name,
        exclude_tables=True,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

    # Read matching time series
    results = {}
    for path in matching_paths:
        parts = parse_pathname(path)
        element_name = parts["element_name"]
        try:
            df = HmsDss.read_timeseries(dss_file, path)
            results[element_name] = df
        except Exception as e:
            logger.warning(f"Could not read {path}: {e}")

    logger.info(f"Extracted {len(results)} result time series")
    return results

get_info(dss_file) staticmethod

Get summary information about a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required

Returns:

Type Description
Dict[str, Any]

Dictionary with file information including path count and data types

Example

info = HmsDss.get_info("results.dss") print(f"Total paths: {info['num_paths']}") print(f"Data types: {info['path_types']}")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def get_info(
    dss_file: Union[str, Path]
) -> Dict[str, Any]:
    """
    Get summary information about a DSS file.

    Args:
        dss_file: Path to the DSS file

    Returns:
        Dictionary with file information including path count and data types

    Example:
        >>> info = HmsDss.get_info("results.dss")
        >>> print(f"Total paths: {info['num_paths']}")
        >>> print(f"Data types: {info['path_types']}")
    """
    dss_file = Path(dss_file)

    info = {
        'file_path': str(dss_file),
        'file_exists': dss_file.exists(),
        'file_size_mb': None,
        'num_paths': None,
        'path_types': {},
        'dss_available': DSS_AVAILABLE
    }

    if dss_file.exists():
        info['file_size_mb'] = round(dss_file.stat().st_size / (1024 * 1024), 2)

        if DSS_AVAILABLE:
            try:
                catalog = HmsDss.get_catalog(dss_file)
                info['num_paths'] = len(catalog)

                # Categorize paths by C-part
                for path in catalog:
                    parts = path.split('/')
                    if len(parts) >= 4:
                        data_type = parts[3]  # C part
                        info['path_types'][data_type] = info['path_types'].get(data_type, 0) + 1

            except Exception as e:
                logger.warning(f"Could not read DSS catalog: {e}")

    return info

list_flow_results(dss_file) staticmethod

List all flow result pathnames in a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required

Returns:

Type Description
List[str]

List of flow result pathnames

Example

flows = HmsDss.list_flow_results("results.dss") for path in flows: ... parts = HmsDss.parse_dss_pathname(path) ... print(f"{parts['element_name']}: {path}")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def list_flow_results(
    dss_file: Union[str, Path]
) -> List[str]:
    """
    List all flow result pathnames in a DSS file.

    Args:
        dss_file: Path to the DSS file

    Returns:
        List of flow result pathnames

    Example:
        >>> flows = HmsDss.list_flow_results("results.dss")
        >>> for path in flows:
        ...     parts = HmsDss.parse_dss_pathname(path)
        ...     print(f"{parts['element_name']}: {path}")
    """
    catalog = HmsDss.get_catalog(dss_file)
    return select_result_paths(
        catalog,
        result_type="flow",
        exclude_tables=False,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

list_precipitation_data(dss_file) staticmethod

List all precipitation data pathnames in a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required

Returns:

Type Description
List[str]

List of precipitation pathnames

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def list_precipitation_data(
    dss_file: Union[str, Path]
) -> List[str]:
    """
    List all precipitation data pathnames in a DSS file.

    Args:
        dss_file: Path to the DSS file

    Returns:
        List of precipitation pathnames
    """
    catalog = HmsDss.get_catalog(dss_file)
    return select_result_paths(
        catalog,
        result_type="precipitation",
        exclude_tables=False,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

list_stage_results(dss_file) staticmethod

List all stage result pathnames in a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required

Returns:

Type Description
List[str]

List of stage result pathnames

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def list_stage_results(
    dss_file: Union[str, Path]
) -> List[str]:
    """
    List all stage result pathnames in a DSS file.

    Args:
        dss_file: Path to the DSS file

    Returns:
        List of stage result pathnames
    """
    catalog = HmsDss.get_catalog(dss_file)
    return select_result_paths(
        catalog,
        result_type="stage",
        exclude_tables=False,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

list_storage_results(dss_file) staticmethod

List all storage result pathnames in a DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required

Returns:

Type Description
List[str]

List of storage result pathnames

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def list_storage_results(
    dss_file: Union[str, Path]
) -> List[str]:
    """
    List all storage result pathnames in a DSS file.

    Args:
        dss_file: Path to the DSS file

    Returns:
        List of storage result pathnames
    """
    catalog = HmsDss.get_catalog(dss_file)
    return select_result_paths(
        catalog,
        result_type="storage",
        exclude_tables=False,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

parse_dss_pathname(pathname) staticmethod

Parse a DSS pathname into its component parts.

DSS pathnames have format: /A/B/C/D/E/F/ - A: Basin/Project identifier - B: Location/Element name - C: Data type (FLOW, PRECIP, etc.) - D: Date/Time block - E: Time interval - F: Version/Run identifier

Parameters:

Name Type Description Default
pathname str

DSS pathname string

required

Returns:

Type Description
Dict[str, str]

Dictionary with pathname components including convenience fields

Example

parts = HmsDss.parse_dss_pathname("/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/") print(parts['element_name']) # 'OUTLET' print(parts['data_type']) # 'FLOW' print(parts['run_name']) # 'RUN1'

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def parse_dss_pathname(pathname: str) -> Dict[str, str]:
    """
    Parse a DSS pathname into its component parts.

    DSS pathnames have format: /A/B/C/D/E/F/
    - A: Basin/Project identifier
    - B: Location/Element name
    - C: Data type (FLOW, PRECIP, etc.)
    - D: Date/Time block
    - E: Time interval
    - F: Version/Run identifier

    Args:
        pathname: DSS pathname string

    Returns:
        Dictionary with pathname components including convenience fields

    Example:
        >>> parts = HmsDss.parse_dss_pathname("/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/")
        >>> print(parts['element_name'])  # 'OUTLET'
        >>> print(parts['data_type'])     # 'FLOW'
        >>> print(parts['run_name'])      # 'RUN1'
    """
    return parse_pathname(pathname)

create_dss_pathname(basin, element, data_type, interval, run_name='', date_block='') staticmethod

Create a DSS pathname from components.

Parameters:

Name Type Description Default
basin str

Basin/Project name (A part)

required
element str

Element name (B part)

required
data_type str

Data type like FLOW, PRECIP (C part)

required
interval str

Time interval like 15MIN, 1HOUR (E part)

required
run_name str

Run identifier (F part)

''
date_block str

Date block (D part, usually empty)

''

Returns:

Type Description
str

Formatted DSS pathname

Example

path = HmsDss.create_dss_pathname( ... "MYBASIN", "OUTLET", "FLOW", "15MIN", "RUN1" ... ) print(path) # '/MYBASIN/OUTLET/FLOW//15MIN/RUN:RUN1/'

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def create_dss_pathname(
    basin: str,
    element: str,
    data_type: str,
    interval: str,
    run_name: str = "",
    date_block: str = ""
) -> str:
    """
    Create a DSS pathname from components.

    Args:
        basin: Basin/Project name (A part)
        element: Element name (B part)
        data_type: Data type like FLOW, PRECIP (C part)
        interval: Time interval like 15MIN, 1HOUR (E part)
        run_name: Run identifier (F part)
        date_block: Date block (D part, usually empty)

    Returns:
        Formatted DSS pathname

    Example:
        >>> path = HmsDss.create_dss_pathname(
        ...     "MYBASIN", "OUTLET", "FLOW", "15MIN", "RUN1"
        ... )
        >>> print(path)  # '/MYBASIN/OUTLET/FLOW//15MIN/RUN:RUN1/'
    """
    return create_pathname(basin, element, data_type, interval, run_name, date_block)

filter_catalog(catalog, pattern=None, data_type=None, element=None) staticmethod

Filter DSS catalog by pattern or components.

Parameters:

Name Type Description Default
catalog List[str]

List of DSS pathnames

required
pattern Optional[str]

Regex pattern to match against full pathname

None
data_type Optional[str]

Filter by C-part (e.g., "FLOW", "PRECIP")

None
element Optional[str]

Filter by B-part (element/location name)

None

Returns:

Type Description
List[str]

Filtered list of pathnames

Example

paths = HmsDss.get_catalog("file.dss") flow_paths = HmsDss.filter_catalog(paths, data_type="FLOW") outlet_flows = HmsDss.filter_catalog(flow_paths, element="OUTLET")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def filter_catalog(
    catalog: List[str],
    pattern: Optional[str] = None,
    data_type: Optional[str] = None,
    element: Optional[str] = None
) -> List[str]:
    """
    Filter DSS catalog by pattern or components.

    Args:
        catalog: List of DSS pathnames
        pattern: Regex pattern to match against full pathname
        data_type: Filter by C-part (e.g., "FLOW", "PRECIP")
        element: Filter by B-part (element/location name)

    Returns:
        Filtered list of pathnames

    Example:
        >>> paths = HmsDss.get_catalog("file.dss")
        >>> flow_paths = HmsDss.filter_catalog(paths, data_type="FLOW")
        >>> outlet_flows = HmsDss.filter_catalog(flow_paths, element="OUTLET")
    """
    return filter_catalog(catalog, pattern, data_type, element)

get_peak_flows(dss_file, element_names=None) staticmethod

Extract peak flow values for all elements.

Note

For large DSS files with many elements (100+), consider using get_peak_flows_batched() instead to avoid memory issues. That method processes elements in batches with garbage collection between batches.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
element_names Optional[List[str]]

Optional list of element names to filter

None

Returns:

Type Description
DataFrame

DataFrame with columns: element, peak_flow, peak_time, units

Example

peaks = HmsDss.get_peak_flows("results.dss") print(peaks.sort_values('peak_flow', ascending=False))

See Also

get_peak_flows_batched: Memory-efficient version for large files

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def get_peak_flows(
    dss_file: Union[str, Path],
    element_names: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Extract peak flow values for all elements.

    Note:
        For large DSS files with many elements (100+), consider using
        get_peak_flows_batched() instead to avoid memory issues. That method
        processes elements in batches with garbage collection between batches.

    Args:
        dss_file: Path to the DSS file
        element_names: Optional list of element names to filter

    Returns:
        DataFrame with columns: element, peak_flow, peak_time, units

    Example:
        >>> peaks = HmsDss.get_peak_flows("results.dss")
        >>> print(peaks.sort_values('peak_flow', ascending=False))

    See Also:
        get_peak_flows_batched: Memory-efficient version for large files
    """
    results = HmsDss.extract_hms_results(dss_file, element_names, result_type="flow")

    records = []
    for element_name, df in results.items():
        if df is not None and not df.empty:
            peak_idx = df['value'].idxmax()
            records.append({
                'element': element_name,
                'peak_flow': df.loc[peak_idx, 'value'],
                'peak_time': peak_idx,
                'units': df.attrs.get('units', '')
            })

    return pd.DataFrame(records)

get_peak_flows_batched(dss_file, element_names=None, run_name=None, batch_size=50, progress=True) staticmethod

Extract peak flows in batches to avoid memory issues.

Processes N elements at a time, garbage collects between batches, preventing memory exhaustion for large DSS files.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
element_names Optional[List[str]]

Optional filter for specific elements

None
run_name Optional[str]

Optional filter for specific run (e.g., "1%(100YR)RUN")

None
batch_size int

Elements to process per batch (default: 50)

50
progress bool

Show progress logging (default: True)

True

Returns:

Type Description
DataFrame

DataFrame with columns: - element: Element name - peak_flow: Peak flow value (cfs) - peak_time: Time of peak (datetime) - units: Engineering units - dss_path: Full DSS pathname

Memory: O(batch_size) instead of O(total_elements)

Example

Safe for 1000+ elements

peaks = HmsDss.get_peak_flows_batched( ... "results.dss", ... run_name="1%(100YR)RUN", ... batch_size=100 # Adjust for available memory ... )

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def get_peak_flows_batched(
    dss_file: Union[str, Path],
    element_names: Optional[List[str]] = None,
    run_name: Optional[str] = None,
    batch_size: int = 50,
    progress: bool = True
) -> pd.DataFrame:
    """
    Extract peak flows in batches to avoid memory issues.

    Processes N elements at a time, garbage collects between batches,
    preventing memory exhaustion for large DSS files.

    Args:
        dss_file: Path to DSS file
        element_names: Optional filter for specific elements
        run_name: Optional filter for specific run (e.g., "1%(100YR)RUN")
        batch_size: Elements to process per batch (default: 50)
        progress: Show progress logging (default: True)

    Returns:
        DataFrame with columns:
            - element: Element name
            - peak_flow: Peak flow value (cfs)
            - peak_time: Time of peak (datetime)
            - units: Engineering units
            - dss_path: Full DSS pathname

    Memory: O(batch_size) instead of O(total_elements)

    Example:
        >>> # Safe for 1000+ elements
        >>> peaks = HmsDss.get_peak_flows_batched(
        ...     "results.dss",
        ...     run_name="1%(100YR)RUN",
        ...     batch_size=100  # Adjust for available memory
        ... )
    """
    dss_file = Path(dss_file)

    if not dss_file.exists():
        raise FileNotFoundError(f"DSS file not found: {dss_file}")

    if not DSS_AVAILABLE:
        raise ImportError(
            "DSS functionality requires pyjnius.\n"
            "Install with: pip install pyjnius\n"
            "Also requires Java 8+ (JRE or JDK)"
        )

    catalog = HmsDss.get_catalog(dss_file)
    flow_paths = select_result_paths(
        catalog,
        result_type="flow-total",
        element_names=element_names,
        run_name=run_name,
        exclude_tables=True,
        result_patterns=HmsDss.HMS_RESULT_PATTERNS,
    )

    total_paths = len(flow_paths)
    if total_paths == 0:
        logger.info("No flow paths found in DSS file")
        return pd.DataFrame(columns=['element', 'peak_flow', 'peak_time', 'units', 'dss_path'])

    if progress:
        logger.info(f"Extracting peaks from {total_paths} paths...")

    records = []
    total_batches = (total_paths + batch_size - 1) // batch_size  # Ceiling division

    for i in range(0, total_paths, batch_size):
        batch_num = i // batch_size + 1
        batch_paths = flow_paths[i:i + batch_size]

        if progress:
            logger.info(f"Batch {batch_num}/{total_batches}: processing {len(batch_paths)} paths...")

        # Process each path in the batch
        for path in batch_paths:
            try:
                parts = HmsDss.parse_dss_pathname(path)

                # Use peak-only extraction (350x more memory efficient)
                peak_info = DssCore.get_peak_value(dss_file, path)

                if peak_info is not None:
                    records.append({
                        'element': parts['element_name'],
                        'peak_flow': peak_info['peak_flow'],
                        'peak_time': peak_info['peak_time'],
                        'units': peak_info['units'],
                        'dss_path': path
                    })

            except Exception as e:
                logger.warning(f"Could not read {path}: {e}")

        # Garbage collect after each batch to free memory
        gc.collect()

    # Create DataFrame from records
    result_df = pd.DataFrame(records)

    # Sort by peak_flow descending
    if not result_df.empty:
        result_df = result_df.sort_values('peak_flow', ascending=False).reset_index(drop=True)

    if progress:
        logger.info(f"Extracted peak flows for {len(result_df)} elements")
    return result_df

get_total_precipitation(dss_file, element_names=None) staticmethod

Calculate total precipitation for all elements.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to the DSS file

required
element_names Optional[List[str]]

Optional list of element names to filter

None

Returns:

Type Description
DataFrame

DataFrame with columns: element, total_precip, units, num_intervals

Example

precip = HmsDss.get_total_precipitation("results.dss") print(precip)

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def get_total_precipitation(
    dss_file: Union[str, Path],
    element_names: Optional[List[str]] = None
) -> pd.DataFrame:
    """
    Calculate total precipitation for all elements.

    Args:
        dss_file: Path to the DSS file
        element_names: Optional list of element names to filter

    Returns:
        DataFrame with columns: element, total_precip, units, num_intervals

    Example:
        >>> precip = HmsDss.get_total_precipitation("results.dss")
        >>> print(precip)
    """
    results = HmsDss.extract_hms_results(
        dss_file, element_names, result_type="precipitation"
    )

    records = []
    for element_name, df in results.items():
        if df is not None and not df.empty:
            records.append({
                'element': element_name,
                'total_precip': df['value'].sum(),
                'units': df.attrs.get('units', ''),
                'num_intervals': len(df)
            })

    return pd.DataFrame(records)

write_paired_data(dss_file, pathname, x_values, y_values, x_units='HOURS', y_units='FRACTION', x_label='TIME', y_label='CUMULATIVE') staticmethod

Write paired data (X-Y curve) to DSS file.

This is used for Atlas 14 temporal distributions, rating curves, and other X-Y relationships.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file (created if doesn't exist)

required
pathname str

DSS pathname (e.g., "//TX_R3/FIRST-QUARTILE/24HR///50%/")

required
x_values

X coordinates (e.g., time in hours) - numpy array or list

required
y_values

Y coordinates (e.g., cumulative fraction 0-1) - numpy array or list

required
x_units str

Units for X values (default: "HOURS")

'HOURS'
y_units str

Units for Y values (default: "FRACTION")

'FRACTION'
x_label str

Label for X axis (default: "TIME")

'TIME'
y_label str

Label for Y axis (default: "CUMULATIVE")

'CUMULATIVE'

Returns:

Type Description
bool

True if write succeeded, False otherwise

Example

import numpy as np x = np.linspace(0, 24, 49) # 0 to 24 hours y = np.linspace(0, 1, 49) # 0 to 100% cumulative HmsDss.write_paired_data( ... "temporal.dss", ... "//TX_R3/ALL-CASES/24HR///50%/", ... x, y ... )

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def write_paired_data(
    dss_file: Union[str, Path],
    pathname: str,
    x_values,
    y_values,
    x_units: str = "HOURS",
    y_units: str = "FRACTION",
    x_label: str = "TIME",
    y_label: str = "CUMULATIVE"
) -> bool:
    """
    Write paired data (X-Y curve) to DSS file.

    This is used for Atlas 14 temporal distributions, rating curves,
    and other X-Y relationships.

    Args:
        dss_file: Path to DSS file (created if doesn't exist)
        pathname: DSS pathname (e.g., "//TX_R3/FIRST-QUARTILE/24HR///50%/")
        x_values: X coordinates (e.g., time in hours) - numpy array or list
        y_values: Y coordinates (e.g., cumulative fraction 0-1) - numpy array or list
        x_units: Units for X values (default: "HOURS")
        y_units: Units for Y values (default: "FRACTION")
        x_label: Label for X axis (default: "TIME")
        y_label: Label for Y axis (default: "CUMULATIVE")

    Returns:
        True if write succeeded, False otherwise

    Example:
        >>> import numpy as np
        >>> x = np.linspace(0, 24, 49)  # 0 to 24 hours
        >>> y = np.linspace(0, 1, 49)   # 0 to 100% cumulative
        >>> HmsDss.write_paired_data(
        ...     "temporal.dss",
        ...     "//TX_R3/ALL-CASES/24HR///50%/",
        ...     x, y
        ... )
    """
    import numpy as np
    dss_file = Path(dss_file)

    if not DSS_AVAILABLE:
        raise ImportError(
            "DSS functionality requires pyjnius.\n"
            "Install with: pip install pyjnius\n"
            "Also requires Java 8+ (JRE or JDK)"
        )

    # Convert to numpy arrays if needed
    x_values = np.asarray(x_values)
    y_values = np.asarray(y_values)

    return DssCore.write_paired_data(
        dss_file, pathname, x_values, y_values,
        x_units, y_units, x_label, y_label
    )

write_multiple_paired_data(dss_file, paired_data_records) staticmethod

Write multiple paired data records to DSS file.

More efficient than calling write_paired_data repeatedly as it keeps the DSS file open for all writes.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
paired_data_records List[Dict]

List of dicts with keys: - pathname: DSS pathname - x_values: numpy array of X values - y_values: numpy array of Y values - x_units: (optional) X units - y_units: (optional) Y units - x_label: (optional) X label - y_label: (optional) Y label

required

Returns:

Type Description
Dict[str, bool]

Dict mapping pathname to success status (True/False)

Example

records = [ ... {"pathname": "//TX_R3/FIRST-QUARTILE/24HR///50%/", ... "x_values": hours, "y_values": fractions_50}, ... {"pathname": "//TX_R3/FIRST-QUARTILE/24HR///90%/", ... "x_values": hours, "y_values": fractions_90}, ... ] results = HmsDss.write_multiple_paired_data("temporal.dss", records) print(f"Wrote {sum(results.values())} records")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def write_multiple_paired_data(
    dss_file: Union[str, Path],
    paired_data_records: List[Dict]
) -> Dict[str, bool]:
    """
    Write multiple paired data records to DSS file.

    More efficient than calling write_paired_data repeatedly as it
    keeps the DSS file open for all writes.

    Args:
        dss_file: Path to DSS file
        paired_data_records: List of dicts with keys:
            - pathname: DSS pathname
            - x_values: numpy array of X values
            - y_values: numpy array of Y values
            - x_units: (optional) X units
            - y_units: (optional) Y units
            - x_label: (optional) X label
            - y_label: (optional) Y label

    Returns:
        Dict mapping pathname to success status (True/False)

    Example:
        >>> records = [
        ...     {"pathname": "//TX_R3/FIRST-QUARTILE/24HR///50%/",
        ...      "x_values": hours, "y_values": fractions_50},
        ...     {"pathname": "//TX_R3/FIRST-QUARTILE/24HR///90%/",
        ...      "x_values": hours, "y_values": fractions_90},
        ... ]
        >>> results = HmsDss.write_multiple_paired_data("temporal.dss", records)
        >>> print(f"Wrote {sum(results.values())} records")
    """
    import numpy as np
    dss_file = Path(dss_file)

    if not DSS_AVAILABLE:
        raise ImportError(
            "DSS functionality requires pyjnius.\n"
            "Install with: pip install pyjnius"
        )

    # Convert values to numpy arrays if needed
    for record in paired_data_records:
        record['x_values'] = np.asarray(record['x_values'])
        record['y_values'] = np.asarray(record['y_values'])

    return DssCore.write_multiple_paired_data(dss_file, paired_data_records)

import_atlas14_temporal(dss_file, temporal_distributions, region_code, duration_hours=24) staticmethod

Import Atlas 14 temporal distributions to DSS file.

Convenience method specifically for Atlas 14 temporal data. Creates proper DSS pathnames and writes all quartiles and probabilities.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Output DSS file path

required
temporal_distributions Dict[str, DataFrame]

Dict mapping quartile names to DataFrames Each DataFrame should have: - Index: hours (0, 0.5, 1.0, ... 24.0) - Columns: probability strings ("90%", "80%", ..., "10%") - Values: cumulative percentages (0-100)

required
region_code str

Atlas 14 region code (e.g., "TX_R3")

required
duration_hours int

Storm duration in hours (default: 24)

24

Returns:

Type Description
Dict[str, bool]

Dict mapping pathname to success status

Example

Parse temporal CSV

temporal = parse_temporal_csv(csv_content)

Import to DSS

results = HmsDss.import_atlas14_temporal( ... "TX_R3_24H.dss", ... temporal, ... region_code="TX_R3", ... duration_hours=24 ... ) print(f"Imported {sum(results.values())}/45 distributions")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def import_atlas14_temporal(
    dss_file: Union[str, Path],
    temporal_distributions: Dict[str, 'pd.DataFrame'],
    region_code: str,
    duration_hours: int = 24
) -> Dict[str, bool]:
    """
    Import Atlas 14 temporal distributions to DSS file.

    Convenience method specifically for Atlas 14 temporal data.
    Creates proper DSS pathnames and writes all quartiles and probabilities.

    Args:
        dss_file: Output DSS file path
        temporal_distributions: Dict mapping quartile names to DataFrames
            Each DataFrame should have:
            - Index: hours (0, 0.5, 1.0, ... 24.0)
            - Columns: probability strings ("90%", "80%", ..., "10%")
            - Values: cumulative percentages (0-100)
        region_code: Atlas 14 region code (e.g., "TX_R3")
        duration_hours: Storm duration in hours (default: 24)

    Returns:
        Dict mapping pathname to success status

    Example:
        >>> # Parse temporal CSV
        >>> temporal = parse_temporal_csv(csv_content)
        >>>
        >>> # Import to DSS
        >>> results = HmsDss.import_atlas14_temporal(
        ...     "TX_R3_24H.dss",
        ...     temporal,
        ...     region_code="TX_R3",
        ...     duration_hours=24
        ... )
        >>> print(f"Imported {sum(results.values())}/45 distributions")
    """
    import numpy as np

    dss_file = Path(dss_file)

    if not DSS_AVAILABLE:
        raise ImportError("DSS functionality requires pyjnius.")

    # Map quartile names to DSS-compatible names
    quartile_dss_names = {
        "First Quartile": "FIRST-QUARTILE",
        "Second Quartile": "SECOND-QUARTILE",
        "Third Quartile": "THIRD-QUARTILE",
        "Fourth Quartile": "FOURTH-QUARTILE",
        "All Cases": "ALL-CASES"
    }

    duration_str = f"{duration_hours}HR"
    records = []

    for quartile_name, df in temporal_distributions.items():
        dss_quartile = quartile_dss_names.get(quartile_name)
        if dss_quartile is None:
            logger.warning(f"Unknown quartile: {quartile_name}, skipping")
            continue

        x_values = np.array(df.index)  # Hours

        for prob_col in df.columns:
            # Generate pathname
            pathname = f"//{region_code}/{dss_quartile}/{duration_str}///{prob_col}/"

            # Convert percentage to fraction
            y_values = df[prob_col].values / 100.0

            records.append({
                'pathname': pathname,
                'x_values': x_values,
                'y_values': y_values,
                'x_units': 'HOURS',
                'y_units': 'FRACTION',
                'x_label': 'TIME',
                'y_label': 'CUMULATIVE'
            })

    logger.info(f"Importing {len(records)} temporal distributions to {dss_file}")
    return DssCore.write_multiple_paired_data(dss_file, records)

read_paired_data(dss_file, pathname) staticmethod

Read paired data (X-Y curve) from DSS file.

This is used for reading rating curves, diversion tables, storage-flow relationships, and other X-Y paired data.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
pathname str

DSS pathname for paired data (e.g., "//ELEMENT/FLOW-DIVERSION///TABLE/")

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary with: - x_values: numpy array of X values - y_values: numpy array of Y values - x_units: X axis units - y_units: Y axis units - x_label: X axis label - y_label: Y axis label - pathname: Original pathname

Optional[Dict[str, Any]]

Returns None if read fails

Example

data = HmsDss.read_paired_data( ... "paired_data.dss", ... "//T1011300_0000_D/FLOW-DIVERSION///TABLE/" ... ) print(f"Inflow values: {data['x_values']}") print(f"Diversion values: {data['y_values']}")

Source code in hms_commander/dss/hms_dss.py
@staticmethod
@log_call
def read_paired_data(
    dss_file: Union[str, Path],
    pathname: str
) -> Optional[Dict[str, Any]]:
    """
    Read paired data (X-Y curve) from DSS file.

    This is used for reading rating curves, diversion tables,
    storage-flow relationships, and other X-Y paired data.

    Args:
        dss_file: Path to DSS file
        pathname: DSS pathname for paired data (e.g., "//ELEMENT/FLOW-DIVERSION///TABLE/")

    Returns:
        Dictionary with:
            - x_values: numpy array of X values
            - y_values: numpy array of Y values
            - x_units: X axis units
            - y_units: Y axis units
            - x_label: X axis label
            - y_label: Y axis label
            - pathname: Original pathname
        Returns None if read fails

    Example:
        >>> data = HmsDss.read_paired_data(
        ...     "paired_data.dss",
        ...     "//T1011300_0000_D/FLOW-DIVERSION///TABLE/"
        ... )
        >>> print(f"Inflow values: {data['x_values']}")
        >>> print(f"Diversion values: {data['y_values']}")
    """
    dss_file = Path(dss_file)

    if not dss_file.exists():
        raise FileNotFoundError(f"DSS file not found: {dss_file}")

    if not DSS_AVAILABLE:
        raise ImportError(
            "DSS functionality requires pyjnius.\n"
            "Install with: pip install pyjnius\n"
            "Also requires Java 8+ (JRE or JDK)"
        )

    return DssCore.read_paired_data(dss_file, pathname)
CLB Engineering Corporation  ·  LLM Forward Engineering
HMS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.