Skip to content

DssCore

Low-level HEC-DSS time-series, paired-data, and catalog operations.

hms_commander.DssCore

Core static class for DSS file operations.

Uses HEC Monolith libraries (auto-downloaded on first use). Supports both DSS V6 and V7 formats.

All heavy dependencies (pyjnius, Java) are lazy-loaded on first use.

Usage

from hms_commander.dss import DssCore

Read time series

df = DssCore.read_timeseries("file.dss", "/BASIN/LOC/FLOW//1HOUR/OBS/")

Get catalog

paths = DssCore.get_catalog("file.dss")

Source code in hms_commander/dss/core.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
class DssCore:
    """
    Core static class for DSS file operations.

    Uses HEC Monolith libraries (auto-downloaded on first use).
    Supports both DSS V6 and V7 formats.

    All heavy dependencies (pyjnius, Java) are lazy-loaded on first use.

    Usage:
        from hms_commander.dss import DssCore

        # Read time series
        df = DssCore.read_timeseries("file.dss", "/BASIN/LOC/FLOW//1HOUR/OBS/")

        # Get catalog
        paths = DssCore.get_catalog("file.dss")
    """

    _jvm_configured = False
    _monolith = None

    @staticmethod
    def _ensure_monolith():
        """Ensure HEC Monolith is downloaded and available."""
        if DssCore._monolith is not None:
            return DssCore._monolith

        # Lazy import from same subpackage
        from ._hec_monolith import HecMonolithDownloader

        DssCore._monolith = HecMonolithDownloader()

        if not DssCore._monolith.is_installed():
            print("\n" + "=" * 80)
            print("HEC Monolith libraries not found")
            print("Installing automatically (one-time download, ~20 MB)...")
            print("=" * 80)
            DssCore._monolith.install()

        return DssCore._monolith

    @staticmethod
    def _configure_jvm(max_memory: str = "4G"):
        """
        Configure Java VM for DSS operations with memory settings.

        Sets up JVM with HEC Monolith libraries and configures heap size
        for large DSS file operations.

        Args:
            max_memory: Maximum heap size (default: "4G")
                       Examples: "512M", "2G", "4G", "8G", "16G"

        Memory recommendations:
            - < 1,000 DSS paths: "2G"
            - 1,000-10,000 paths: "4G" (default)
            - 10,000-50,000 paths: "8G"
            - > 50,000 paths: "16G"

        Note: JVM options must be set BEFORE first jnius import.
              If JVM already running, max_memory parameter is ignored.
        """
        if DssCore._jvm_configured:
            return

        # Ensure monolith is installed
        monolith = DssCore._ensure_monolith()

        # Lazy import pyjnius config
        try:
            import jnius_config
        except ImportError:
            raise ImportError(
                "pyjnius is required for DSS file operations.\n"
                "Install with: pip install pyjnius"
            )

        # Check if JVM already started - if so, we can't set memory options
        if jnius_config.vm_running:
            logger.warning(f"JVM already running, cannot set max_memory={max_memory}")
            DssCore._jvm_configured = True
            return

        # Configure JVM memory options BEFORE first jnius import
        # These must be set before the JVM starts
        jvm_args = [
            '-Xms512M',              # Initial heap: 512 MB
            f'-Xmx{max_memory}',     # Max heap: configurable
            '-XX:+UseG1GC',          # G1 garbage collector
            '-XX:MaxGCPauseMillis=200',  # Limit GC pause time
        ]
        jnius_config.add_options(*jvm_args)
        logger.info(f"Configured JVM with max memory: {max_memory}")

        # Get classpath and library path
        classpath = monolith.get_classpath()
        library_path = monolith.get_library_path()

        print("Configuring Java VM for DSS operations...")

        # Set JAVA_HOME if not already set
        if 'JAVA_HOME' not in os.environ:
            java_candidates = []

            # Check HMS-bundled JREs first (most reliable on Windows)
            if os.name == 'nt':
                for hms_dir in [Path("C:/Program Files/HEC/HEC-HMS"),
                                Path("C:/Program Files (x86)/HEC/HEC-HMS")]:
                    if hms_dir.exists():
                        # Sort versions descending to prefer newest
                        versions = sorted(hms_dir.iterdir(), reverse=True)
                        for v in versions:
                            jre = v / "jre"
                            if jre.exists() and (jre / "bin").exists():
                                java_candidates.append(jre)

            # Standard Java installations
            java_candidates.extend([
                Path("C:/Program Files/Java/jdk-21"),
                Path("C:/Program Files/Java/jdk-17"),
                Path("C:/Program Files/Java/jdk-11"),
                Path("/usr/lib/jvm/java-17-openjdk"),
                Path("/usr/lib/jvm/java-11-openjdk"),
            ])

            # Also scan C:/Program Files/Java/ for any JRE/JDK
            java_base = Path("C:/Program Files/Java")
            if java_base.exists():
                for d in sorted(java_base.iterdir(), reverse=True):
                    if d.is_dir() and (d / "bin").exists():
                        java_candidates.append(d)

            for java_home in java_candidates:
                if java_home.exists():
                    os.environ['JAVA_HOME'] = str(java_home)
                    print(f"  Found Java: {java_home}")
                    break
            else:
                raise RuntimeError(
                    "Java not found. Please set JAVA_HOME environment variable "
                    "or install Java JDK/JRE.\n"
                    "Download from: https://www.oracle.com/java/technologies/downloads/"
                )

        # Set classpath (must be done before first import from jnius)
        jnius_config.add_classpath(*classpath)

        # Set library path for native libraries
        if 'LD_LIBRARY_PATH' in os.environ:
            os.environ['LD_LIBRARY_PATH'] = (
                library_path + ':' + os.environ['LD_LIBRARY_PATH']
            )
        else:
            os.environ['LD_LIBRARY_PATH'] = library_path

        # Windows: Add to PATH for native DLLs
        if os.name == 'nt':
            os.environ['PATH'] = (
                library_path + os.pathsep + os.environ.get('PATH', '')
            )

        DssCore._jvm_configured = True
        print("[OK] Java VM configured")

    @staticmethod
    def is_available() -> bool:
        """
        Check if DSS functionality is available.

        Returns:
            True if pyjnius can be imported
        """
        try:
            import jnius_config
            return True
        except ImportError:
            return False

    @staticmethod
    def get_catalog(dss_file: Union[str, Path]) -> List[str]:
        """
        Get list of all data paths in DSS file.

        Args:
            dss_file: Path to DSS file

        Returns:
            List of DSS path strings

        Example:
            paths = DssCore.get_catalog("sample.dss")
            for path in paths:
                print(path)
        """
        # Configure JVM (must be before first jnius import)
        DssCore._configure_jvm()

        # Import Java classes via pyjnius (lazy)
        from jnius import autoclass

        HecDss = autoclass('hec.heclib.dss.HecDss')

        dss_file = str(Path(dss_file).resolve())

        # Open DSS file
        dss = HecDss.open(dss_file)

        # Suppress DSS library verbose output (ZREAD/ZOPEN messages)
        # Note: DSS Fortran library writes directly to stdout and cannot be suppressed
        try:
            dss.setMessageLevel(0)  # 0 = quiet, 1 = errors only, 2 = warnings, 3 = verbose
        except Exception:
            pass  # Older DSS versions may not have this method

        try:
            # Get catalog (returns Java Vector of pathname strings)
            catalog_vector = dss.getCatalogedPathnames()

            # Convert Java Vector to Python list
            paths = []
            for i in range(catalog_vector.size()):
                paths.append(str(catalog_vector.get(i)))

            return paths

        finally:
            dss.done()

    @staticmethod
    def read_timeseries(
        dss_file: Union[str, Path],
        pathname: str,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None
    ) -> pd.DataFrame:
        """
        Read time series from DSS file.

        Args:
            dss_file: Path to DSS file
            pathname: DSS pathname (e.g., "/BASIN/LOC/FLOW//1HOUR/OBS/")
            start_date: Optional start date filter
            end_date: Optional end date filter

        Returns:
            pandas DataFrame with:
            - DatetimeIndex for time series operations
            - 'datetime' column for plotting (same as index)
            - 'value' column with time series data
            - Metadata via df.attrs: pathname, units, type, interval, dss_file

        Example:
            df = DssCore.read_timeseries("file.dss", "/BASIN/LOC/FLOW//1HOUR/OBS/")
            print(df.head())
            print(f"Units: {df.attrs['units']}")

            # Plotting options:
            ax.plot(df.index, df['value'])          # Using DatetimeIndex
            ax.plot(df['datetime'], df['value'])    # Using datetime column
            df.plot(y='value')                      # Pandas automatic
        """
        # Configure JVM (must be before first jnius import)
        DssCore._configure_jvm()

        # Import Java classes via pyjnius (lazy)
        from jnius import autoclass, cast

        HecDss = autoclass('hec.heclib.dss.HecDss')
        TimeSeriesContainer = autoclass('hec.io.TimeSeriesContainer')

        dss_file = str(Path(dss_file).resolve())

        # Open DSS file
        dss = HecDss.open(dss_file)

        # Suppress DSS library verbose output (ZREAD/ZOPEN messages)
        # Note: DSS Fortran library writes directly to stdout and cannot be suppressed
        try:
            dss.setMessageLevel(0)  # 0 = quiet, 1 = errors only, 2 = warnings, 3 = verbose
        except Exception:
            pass  # Older DSS versions may not have this method

        try:
            # Read time series
            # True = ignore D-part (date) for wildcards
            container = dss.get(pathname, True)

            if container is None:
                raise ValueError(f"No data found for pathname: {pathname}")

            # Cast to TimeSeriesContainer to access fields
            tsc = cast('hec.io.TimeSeriesContainer', container)

            # Extract values and times from Java container
            # pyjnius automatically converts Java arrays to Python lists
            values = np.array(tsc.values)  # Java double[] -> numpy array
            times = np.array(tsc.times)    # Java int[] -> numpy array (minutes since 1899-12-31)

            # Validate that we got data
            if len(values) == 0 or len(times) == 0:
                raise ValueError(f"No data found in time series for pathname: {pathname}")

            if len(values) != len(times):
                raise ValueError(
                    f"Mismatched array lengths: {len(values)} values, {len(times)} times"
                )

            # Convert HEC time to numpy datetime64
            # HEC epoch: December 31, 1899 00:00:00
            HEC_EPOCH = np.datetime64('1899-12-31T00:00:00')

            # Convert times (minutes since epoch) to timedelta
            # Handle potential invalid/missing times
            times_minutes = times.astype('int64')  # Ensure integer type
            time_deltas = pd.to_timedelta(times_minutes, unit='m')

            # Add to epoch to get actual datetimes
            datetimes = pd.DatetimeIndex(
                [HEC_EPOCH.astype('datetime64[ns]') + td for td in time_deltas]
            )

            # Create DataFrame with DatetimeIndex for time series operations
            df = pd.DataFrame({
                'value': values
            }, index=datetimes)

            # Also add datetime as a column for easier plotting
            # Users can do: ax.plot(df['datetime'], df['value'])
            # Or: df.plot(x='datetime', y='value')
            df.insert(0, 'datetime', df.index)

            # Add metadata as attributes
            df.attrs['pathname'] = pathname
            df.attrs['units'] = str(tsc.units) if tsc.units else ""
            df.attrs['type'] = str(tsc.type) if tsc.type else ""
            df.attrs['interval'] = (
                int(tsc.interval) if hasattr(tsc, 'interval') else None
            )
            df.attrs['dss_file'] = dss_file

            return df

        finally:
            dss.done()

    @staticmethod
    def _hec_time_to_datetime(hec_time_minutes: int) -> 'pd.Timestamp':
        """
        Convert HEC time (minutes since 1899-12-31) to Python datetime.

        Args:
            hec_time_minutes: Minutes since HEC epoch (1899-12-31 00:00:00)

        Returns:
            pandas Timestamp
        """
        HEC_EPOCH = pd.Timestamp('1899-12-31 00:00:00')
        return HEC_EPOCH + pd.Timedelta(minutes=int(hec_time_minutes))

    @staticmethod
    def get_peak_value(
        dss_file: Union[str, Path],
        pathname: str
    ) -> Optional[Dict[str, Any]]:
        """
        Extract ONLY peak value from DSS without loading full time series.

        Uses HecDss API to read values array, calculates statistics immediately
        in NumPy, and returns only peak metadata. This avoids DataFrame overhead.

        Args:
            dss_file: Path to DSS file
            pathname: DSS pathname to read

        Returns:
            Dictionary with:
                - peak_flow: Maximum value (float)
                - peak_time: Time of maximum (datetime)
                - units: Engineering units (str)
                - count: Number of timesteps (int)
                - min_value: Minimum value (float)
                - mean_value: Mean value (float)
            Returns None if path doesn't exist or read fails

        Memory: ~200 bytes vs ~70 KB for full time series

        Example:
            >>> peak_info = DssCore.get_peak_value("results.dss", "//OUTLET/FLOW/.../")
            >>> print(f"Peak: {peak_info['peak_flow']} {peak_info['units']}")
        """
        # Configure JVM (must be before first jnius import)
        DssCore._configure_jvm()

        # Import Java classes via pyjnius (lazy)
        from jnius import autoclass, cast

        HecDss = autoclass('hec.heclib.dss.HecDss')
        TimeSeriesContainer = autoclass('hec.io.TimeSeriesContainer')

        dss_file = Path(dss_file)

        if not dss_file.exists():
            logger.warning(f"DSS file not found: {dss_file}")
            return None

        # Open DSS file
        dss = HecDss.open(str(dss_file.resolve()))

        # Suppress DSS library verbose output (ZREAD messages)
        # Note: DSS Fortran library writes directly to stdout and cannot be suppressed
        try:
            dss.setMessageLevel(0)  # 0 = quiet, 1 = errors only, 2 = warnings, 3 = verbose
        except Exception:
            pass  # Older DSS versions may not have this method

        try:
            # Read time series
            # True = ignore D-part (date) for wildcards
            container = dss.get(pathname, True)

            if container is None:
                logger.warning(f"No data found for pathname: {pathname}")
                return None

            # Cast to TimeSeriesContainer to access fields
            tsc = cast('hec.io.TimeSeriesContainer', container)

            # Extract values and times from Java container directly to NumPy
            # pyjnius automatically converts Java arrays to Python lists
            values = np.array(tsc.values)  # Java double[] -> numpy array
            times = np.array(tsc.times)    # Java int[] -> numpy array (minutes since 1899-12-31)

            # Validate that we got data
            if len(values) == 0 or len(times) == 0:
                logger.warning(f"No data found in time series for pathname: {pathname}")
                return None

            if len(values) != len(times):
                logger.warning(
                    f"Mismatched array lengths: {len(values)} values, {len(times)} times"
                )
                return None

            # Calculate statistics immediately in NumPy - no DataFrame overhead
            peak_idx = int(np.argmax(values))
            peak_value = float(values[peak_idx])
            peak_time_minutes = int(times[peak_idx])

            # Convert peak time to datetime
            peak_time = DssCore._hec_time_to_datetime(peak_time_minutes)

            # Calculate additional statistics
            min_value = float(np.min(values))
            mean_value = float(np.mean(values))
            count = len(values)

            # Extract units from container
            units = str(tsc.units) if tsc.units else ""

            return {
                'peak_flow': peak_value,
                'peak_time': peak_time,
                'units': units,
                'count': count,
                'min_value': min_value,
                'mean_value': mean_value
            }

        except Exception as e:
            logger.error(f"Error reading DSS pathname {pathname}: {e}")
            return None

        finally:
            dss.done()

    @staticmethod
    def read_multiple_timeseries(
        dss_file: Union[str, Path],
        pathnames: List[str]
    ) -> Dict[str, pd.DataFrame]:
        """
        Read multiple time series from DSS file.

        Args:
            dss_file: Path to DSS file
            pathnames: List of DSS pathnames

        Returns:
            Dictionary mapping pathnames to DataFrames (None on failure)

        Example:
            paths = ["/BASIN/LOC1/FLOW//1HOUR/OBS/", "/BASIN/LOC2/FLOW//1HOUR/OBS/"]
            data = DssCore.read_multiple_timeseries("file.dss", paths)
            for path, df in data.items():
                if df is not None:
                    print(f"{path}: {len(df)} points")
        """
        results = {}
        for pathname in pathnames:
            try:
                results[pathname] = DssCore.read_timeseries(dss_file, pathname)
            except Exception as e:
                logger.warning(f"Could not read {pathname}: {e}")
                results[pathname] = None

        return results

    @staticmethod
    def get_info(dss_file: Union[str, Path]) -> Dict[str, Any]:
        """
        Get summary information about DSS file.

        Args:
            dss_file: Path to DSS file

        Returns:
            Dictionary with file information

        Example:
            info = DssCore.get_info("sample.dss")
            print(f"Total paths: {info['total_paths']}")
            print(f"File size: {info['file_size_mb']:.2f} MB")
        """
        dss_path = Path(dss_file)

        if not dss_path.exists():
            return {
                'filepath': str(dss_path),
                'exists': False,
                'error': 'File not found'
            }

        catalog = DssCore.get_catalog(dss_file)

        # Categorize paths by C-part (data type)
        path_types = {}
        for path in catalog:
            parts = path.split('/')
            if len(parts) >= 4:
                data_type = parts[3]  # C part
                path_types[data_type] = path_types.get(data_type, 0) + 1

        return {
            'filepath': str(dss_path.resolve()),
            'filename': dss_path.name,
            'exists': True,
            'file_size_mb': dss_path.stat().st_size / (1024 * 1024),
            'total_paths': len(catalog),
            'path_types': path_types,
            'first_5_paths': catalog[:5] if len(catalog) > 5 else catalog,
        }

    @staticmethod
    def parse_pathname(pathname: str) -> Dict[str, str]:
        """
        Parse a DSS pathname into its component parts.

        DSS pathnames have format: /A/B/C/D/E/F/
        - A: Basin/Project identifier
        - B: Location/Element name
        - C: Data type (FLOW, PRECIP, etc.)
        - D: Date/Time block
        - E: Time interval
        - F: Version/Run identifier

        Args:
            pathname: DSS pathname string

        Returns:
            Dictionary with pathname components

        Example:
            parts = DssCore.parse_pathname("/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/")
            print(parts['B'])  # 'OUTLET'
            print(parts['C'])  # 'FLOW'
        """
        return parse_pathname(pathname)

    @staticmethod
    def create_pathname(
        basin: str,
        element: str,
        data_type: str,
        interval: str,
        run_name: str = "",
        date_block: str = ""
    ) -> str:
        """
        Create a DSS pathname from components.

        Args:
            basin: Basin/Project name (A part)
            element: Element name (B part)
            data_type: Data type like FLOW, PRECIP (C part)
            interval: Time interval like 15MIN, 1HOUR (E part)
            run_name: Run identifier (F part)
            date_block: Date block (D part, usually empty)

        Returns:
            Formatted DSS pathname

        Example:
            path = DssCore.create_pathname(
                "MYBASIN", "OUTLET", "FLOW", "15MIN", "RUN1"
            )
            print(path)  # '/MYBASIN/OUTLET/FLOW//15MIN/RUN:RUN1/'
        """
        return create_pathname(basin, element, data_type, interval, run_name, date_block)

    @staticmethod
    def filter_catalog(
        catalog: List[str],
        pattern: Optional[str] = None,
        data_type: Optional[str] = None,
        element: Optional[str] = None
    ) -> List[str]:
        """
        Filter DSS catalog by pattern or components.

        Args:
            catalog: List of DSS pathnames
            pattern: Regex pattern to match against full pathname
            data_type: Filter by C-part (e.g., "FLOW", "PRECIP")
            element: Filter by B-part (element/location name)

        Returns:
            Filtered list of pathnames

        Example:
            paths = DssCore.get_catalog("file.dss")
            flow_paths = DssCore.filter_catalog(paths, data_type="FLOW")
        """
        return filter_catalog(catalog, pattern, data_type, element)

    @staticmethod
    def write_paired_data(
        dss_file: Union[str, Path],
        pathname: str,
        x_values: np.ndarray,
        y_values: np.ndarray,
        x_units: str = "HOURS",
        y_units: str = "FRACTION",
        x_label: str = "TIME",
        y_label: str = "CUMULATIVE"
    ) -> bool:
        """
        Write paired data (X-Y curve) to DSS file.

        This is used for Atlas 14 temporal distributions, rating curves,
        and other X-Y relationships.

        Args:
            dss_file: Path to DSS file (created if doesn't exist)
            pathname: DSS pathname (e.g., "//TX_R3/FIRST-QUARTILE/24HR///50%/")
            x_values: X coordinates (e.g., time in hours)
            y_values: Y coordinates (e.g., cumulative fraction 0-1)
            x_units: Units for X values (default: "HOURS")
            y_units: Units for Y values (default: "FRACTION")
            x_label: Label for X axis (default: "TIME")
            y_label: Label for Y axis (default: "CUMULATIVE")

        Returns:
            True if write succeeded, False otherwise

        Example:
            >>> x = np.linspace(0, 24, 49)  # 0 to 24 hours
            >>> y = np.linspace(0, 1, 49)   # 0 to 100% cumulative
            >>> DssCore.write_paired_data(
            ...     "temporal.dss",
            ...     "//TX_R3/ALL-CASES/24HR///50%/",
            ...     x, y
            ... )
        """
        # Configure JVM (must be before first jnius import)
        DssCore._configure_jvm()

        # Import Java classes via pyjnius (lazy)
        from jnius import autoclass

        HecDss = autoclass('hec.heclib.dss.HecDss')
        PairedDataContainer = autoclass('hec.io.PairedDataContainer')

        dss_file = str(Path(dss_file).resolve())

        # Open DSS file (creates if doesn't exist)
        dss = HecDss.open(dss_file)

        # Suppress DSS library verbose output
        try:
            dss.setMessageLevel(0)
        except Exception:
            pass

        try:
            # Create PairedDataContainer
            container = PairedDataContainer()

            # Use setter methods (pyjnius requires explicit setters, not property assignment)
            container.setFullName(pathname)
            container.setXUnits(x_units)
            container.setYUnits(y_units)
            container.setXType(x_label)
            container.setYType(y_label)

            # Convert numpy arrays to Java arrays
            # X ordinates: 1D array (double[])
            x_list = x_values.astype(np.float64).tolist()

            # Y ordinates: 2D array (double[numberCurves][numberOrdinates])
            # For single curve, wrap in outer list: [[y1, y2, ...]]
            # See: https://www.hec.usace.army.mil/confluence/dssdocs/dssjavaprogrammer/paired-data
            y_list = y_values.astype(np.float64).tolist()
            y_2d = [y_list]  # Single curve: yOrdinates[1][n]

            container.setXOrdinates(x_list)
            container.setYOrdinates(y_2d)
            container.setNumberOrdinates(len(x_values))
            container.setNumberCurves(1)

            # Write to DSS file
            dss.put(container)

            logger.info(f"Wrote paired data to {pathname}")
            return True

        except Exception as e:
            logger.error(f"Error writing paired data to {pathname}: {e}")
            return False

        finally:
            dss.done()

    @staticmethod
    def write_multiple_paired_data(
        dss_file: Union[str, Path],
        paired_data_records: List[Dict[str, Any]]
    ) -> Dict[str, bool]:
        """
        Write multiple paired data records to DSS file.

        More efficient than calling write_paired_data repeatedly as it
        keeps the DSS file open for all writes.

        Args:
            dss_file: Path to DSS file
            paired_data_records: List of dicts with keys:
                - pathname: DSS pathname
                - x_values: numpy array of X values
                - y_values: numpy array of Y values
                - x_units: (optional) X units
                - y_units: (optional) Y units
                - x_label: (optional) X label
                - y_label: (optional) Y label

        Returns:
            Dict mapping pathname to success status (True/False)

        Example:
            >>> records = [
            ...     {"pathname": "//A/B/C///D/", "x_values": x1, "y_values": y1},
            ...     {"pathname": "//A/E/C///D/", "x_values": x2, "y_values": y2},
            ... ]
            >>> results = DssCore.write_multiple_paired_data("file.dss", records)
        """
        # Configure JVM (must be before first jnius import)
        DssCore._configure_jvm()

        # Import Java classes via pyjnius (lazy)
        from jnius import autoclass

        HecDss = autoclass('hec.heclib.dss.HecDss')
        PairedDataContainer = autoclass('hec.io.PairedDataContainer')

        dss_file = str(Path(dss_file).resolve())
        results = {}

        # Open DSS file once for all writes
        dss = HecDss.open(dss_file)

        try:
            dss.setMessageLevel(0)
        except Exception:
            pass

        try:
            for record in paired_data_records:
                pathname = record['pathname']
                try:
                    # Create container with setter methods
                    container = PairedDataContainer()
                    container.setFullName(pathname)
                    container.setXUnits(record.get('x_units', 'HOURS'))
                    container.setYUnits(record.get('y_units', 'FRACTION'))
                    container.setXType(record.get('x_label', 'TIME'))
                    container.setYType(record.get('y_label', 'CUMULATIVE'))

                    x_values = record['x_values']
                    y_values = record['y_values']

                    # X ordinates: 1D array (double[])
                    x_list = x_values.astype(np.float64).tolist()

                    # Y ordinates: 2D array (double[numberCurves][numberOrdinates])
                    y_list = y_values.astype(np.float64).tolist()
                    y_2d = [y_list]  # Single curve: yOrdinates[1][n]

                    container.setXOrdinates(x_list)
                    container.setYOrdinates(y_2d)
                    container.setNumberOrdinates(len(x_values))
                    container.setNumberCurves(1)

                    # Write to DSS
                    dss.put(container)
                    results[pathname] = True

                except Exception as e:
                    logger.error(f"Error writing {pathname}: {e}")
                    results[pathname] = False

            logger.info(f"Wrote {sum(results.values())}/{len(results)} paired data records")

        finally:
            dss.done()

        return results

    @staticmethod
    def read_paired_data(
        dss_file: Union[str, Path],
        pathname: str
    ) -> Optional[Dict[str, Any]]:
        """
        Read paired data (X-Y curve) from DSS file.

        Args:
            dss_file: Path to DSS file
            pathname: DSS pathname for paired data

        Returns:
            Dictionary with:
                - x_values: numpy array of X values
                - y_values: numpy array of Y values
                - x_units: X axis units
                - y_units: Y axis units
                - x_label: X axis label
                - y_label: Y axis label
                - pathname: Original pathname
            Returns None if read fails

        Example:
            >>> data = DssCore.read_paired_data(
            ...     "file.dss",
            ...     "//ELEMENT/FLOW-DIVERSION///TABLE/"
            ... )
            >>> print(f"X: {data['x_values']}")
            >>> print(f"Y: {data['y_values']}")
        """
        # Configure JVM (must be before first jnius import)
        DssCore._configure_jvm()

        # Import Java classes via pyjnius (lazy)
        from jnius import autoclass, cast

        HecDss = autoclass('hec.heclib.dss.HecDss')
        PairedDataContainer = autoclass('hec.io.PairedDataContainer')

        dss_file = Path(dss_file)

        if not dss_file.exists():
            logger.warning(f"DSS file not found: {dss_file}")
            return None

        # Open DSS file
        dss = HecDss.open(str(dss_file.resolve()))

        # Suppress DSS library verbose output
        try:
            dss.setMessageLevel(0)
        except Exception:
            pass

        try:
            # Read paired data
            # True = ignore D-part for wildcard matching
            container = dss.get(pathname, True)

            if container is None:
                logger.warning(f"No data found for pathname: {pathname}")
                return None

            # Cast to PairedDataContainer to access fields
            pdc = cast('hec.io.PairedDataContainer', container)

            # Extract X ordinates (1D array)
            x_values = np.array(pdc.xOrdinates)

            # Extract Y ordinates (2D array - [curves][ordinates])
            # Most paired data has single curve, so take first curve
            y_ordinates = pdc.yOrdinates
            if y_ordinates is not None:
                # Convert Java 2D array to numpy
                # y_ordinates[0] is the first (usually only) curve
                y_values = np.array(y_ordinates[0])
            else:
                logger.warning(f"No Y ordinates found for pathname: {pathname}")
                return None

            # Extract metadata
            x_units = str(pdc.xUnits) if pdc.xUnits else ""
            y_units = str(pdc.yUnits) if pdc.yUnits else ""
            x_label = str(pdc.xType) if pdc.xType else ""
            y_label = str(pdc.yType) if pdc.yType else ""

            return {
                'x_values': x_values,
                'y_values': y_values,
                'x_units': x_units,
                'y_units': y_units,
                'x_label': x_label,
                'y_label': y_label,
                'pathname': pathname
            }

        except Exception as e:
            logger.error(f"Error reading paired data from {pathname}: {e}")
            return None

        finally:
            dss.done()

    @staticmethod
    def shutdown_jvm():
        """
        Shutdown Java Virtual Machine.

        Note: With pyjnius, JVM shutdown is typically not needed.
        This is a placeholder for API compatibility.
        """
        logger.info("pyjnius handles JVM lifecycle automatically")
        pass

is_available() staticmethod

Check if DSS functionality is available.

Returns:

Type Description
bool

True if pyjnius can be imported

Source code in hms_commander/dss/core.py
@staticmethod
def is_available() -> bool:
    """
    Check if DSS functionality is available.

    Returns:
        True if pyjnius can be imported
    """
    try:
        import jnius_config
        return True
    except ImportError:
        return False

get_catalog(dss_file) staticmethod

Get list of all data paths in DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required

Returns:

Type Description
List[str]

List of DSS path strings

Example

paths = DssCore.get_catalog("sample.dss") for path in paths: print(path)

Source code in hms_commander/dss/core.py
@staticmethod
def get_catalog(dss_file: Union[str, Path]) -> List[str]:
    """
    Get list of all data paths in DSS file.

    Args:
        dss_file: Path to DSS file

    Returns:
        List of DSS path strings

    Example:
        paths = DssCore.get_catalog("sample.dss")
        for path in paths:
            print(path)
    """
    # Configure JVM (must be before first jnius import)
    DssCore._configure_jvm()

    # Import Java classes via pyjnius (lazy)
    from jnius import autoclass

    HecDss = autoclass('hec.heclib.dss.HecDss')

    dss_file = str(Path(dss_file).resolve())

    # Open DSS file
    dss = HecDss.open(dss_file)

    # Suppress DSS library verbose output (ZREAD/ZOPEN messages)
    # Note: DSS Fortran library writes directly to stdout and cannot be suppressed
    try:
        dss.setMessageLevel(0)  # 0 = quiet, 1 = errors only, 2 = warnings, 3 = verbose
    except Exception:
        pass  # Older DSS versions may not have this method

    try:
        # Get catalog (returns Java Vector of pathname strings)
        catalog_vector = dss.getCatalogedPathnames()

        # Convert Java Vector to Python list
        paths = []
        for i in range(catalog_vector.size()):
            paths.append(str(catalog_vector.get(i)))

        return paths

    finally:
        dss.done()

read_timeseries(dss_file, pathname, start_date=None, end_date=None) staticmethod

Read time series from DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
pathname str

DSS pathname (e.g., "/BASIN/LOC/FLOW//1HOUR/OBS/")

required
start_date Optional[str]

Optional start date filter

None
end_date Optional[str]

Optional end date filter

None

Returns:

Type Description
DataFrame

pandas DataFrame with:

DataFrame
  • DatetimeIndex for time series operations
DataFrame
  • 'datetime' column for plotting (same as index)
DataFrame
  • 'value' column with time series data
DataFrame
  • Metadata via df.attrs: pathname, units, type, interval, dss_file
Example

df = DssCore.read_timeseries("file.dss", "/BASIN/LOC/FLOW//1HOUR/OBS/") print(df.head()) print(f"Units: {df.attrs['units']}")

Plotting options:

ax.plot(df.index, df['value']) # Using DatetimeIndex ax.plot(df['datetime'], df['value']) # Using datetime column df.plot(y='value') # Pandas automatic

Source code in hms_commander/dss/core.py
@staticmethod
def read_timeseries(
    dss_file: Union[str, Path],
    pathname: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None
) -> pd.DataFrame:
    """
    Read time series from DSS file.

    Args:
        dss_file: Path to DSS file
        pathname: DSS pathname (e.g., "/BASIN/LOC/FLOW//1HOUR/OBS/")
        start_date: Optional start date filter
        end_date: Optional end date filter

    Returns:
        pandas DataFrame with:
        - DatetimeIndex for time series operations
        - 'datetime' column for plotting (same as index)
        - 'value' column with time series data
        - Metadata via df.attrs: pathname, units, type, interval, dss_file

    Example:
        df = DssCore.read_timeseries("file.dss", "/BASIN/LOC/FLOW//1HOUR/OBS/")
        print(df.head())
        print(f"Units: {df.attrs['units']}")

        # Plotting options:
        ax.plot(df.index, df['value'])          # Using DatetimeIndex
        ax.plot(df['datetime'], df['value'])    # Using datetime column
        df.plot(y='value')                      # Pandas automatic
    """
    # Configure JVM (must be before first jnius import)
    DssCore._configure_jvm()

    # Import Java classes via pyjnius (lazy)
    from jnius import autoclass, cast

    HecDss = autoclass('hec.heclib.dss.HecDss')
    TimeSeriesContainer = autoclass('hec.io.TimeSeriesContainer')

    dss_file = str(Path(dss_file).resolve())

    # Open DSS file
    dss = HecDss.open(dss_file)

    # Suppress DSS library verbose output (ZREAD/ZOPEN messages)
    # Note: DSS Fortran library writes directly to stdout and cannot be suppressed
    try:
        dss.setMessageLevel(0)  # 0 = quiet, 1 = errors only, 2 = warnings, 3 = verbose
    except Exception:
        pass  # Older DSS versions may not have this method

    try:
        # Read time series
        # True = ignore D-part (date) for wildcards
        container = dss.get(pathname, True)

        if container is None:
            raise ValueError(f"No data found for pathname: {pathname}")

        # Cast to TimeSeriesContainer to access fields
        tsc = cast('hec.io.TimeSeriesContainer', container)

        # Extract values and times from Java container
        # pyjnius automatically converts Java arrays to Python lists
        values = np.array(tsc.values)  # Java double[] -> numpy array
        times = np.array(tsc.times)    # Java int[] -> numpy array (minutes since 1899-12-31)

        # Validate that we got data
        if len(values) == 0 or len(times) == 0:
            raise ValueError(f"No data found in time series for pathname: {pathname}")

        if len(values) != len(times):
            raise ValueError(
                f"Mismatched array lengths: {len(values)} values, {len(times)} times"
            )

        # Convert HEC time to numpy datetime64
        # HEC epoch: December 31, 1899 00:00:00
        HEC_EPOCH = np.datetime64('1899-12-31T00:00:00')

        # Convert times (minutes since epoch) to timedelta
        # Handle potential invalid/missing times
        times_minutes = times.astype('int64')  # Ensure integer type
        time_deltas = pd.to_timedelta(times_minutes, unit='m')

        # Add to epoch to get actual datetimes
        datetimes = pd.DatetimeIndex(
            [HEC_EPOCH.astype('datetime64[ns]') + td for td in time_deltas]
        )

        # Create DataFrame with DatetimeIndex for time series operations
        df = pd.DataFrame({
            'value': values
        }, index=datetimes)

        # Also add datetime as a column for easier plotting
        # Users can do: ax.plot(df['datetime'], df['value'])
        # Or: df.plot(x='datetime', y='value')
        df.insert(0, 'datetime', df.index)

        # Add metadata as attributes
        df.attrs['pathname'] = pathname
        df.attrs['units'] = str(tsc.units) if tsc.units else ""
        df.attrs['type'] = str(tsc.type) if tsc.type else ""
        df.attrs['interval'] = (
            int(tsc.interval) if hasattr(tsc, 'interval') else None
        )
        df.attrs['dss_file'] = dss_file

        return df

    finally:
        dss.done()

get_peak_value(dss_file, pathname) staticmethod

Extract ONLY peak value from DSS without loading full time series.

Uses HecDss API to read values array, calculates statistics immediately in NumPy, and returns only peak metadata. This avoids DataFrame overhead.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
pathname str

DSS pathname to read

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary with: - peak_flow: Maximum value (float) - peak_time: Time of maximum (datetime) - units: Engineering units (str) - count: Number of timesteps (int) - min_value: Minimum value (float) - mean_value: Mean value (float)

Optional[Dict[str, Any]]

Returns None if path doesn't exist or read fails

Memory: ~200 bytes vs ~70 KB for full time series

Example

peak_info = DssCore.get_peak_value("results.dss", "//OUTLET/FLOW/.../") print(f"Peak: {peak_info['peak_flow']} {peak_info['units']}")

Source code in hms_commander/dss/core.py
@staticmethod
def get_peak_value(
    dss_file: Union[str, Path],
    pathname: str
) -> Optional[Dict[str, Any]]:
    """
    Extract ONLY peak value from DSS without loading full time series.

    Uses HecDss API to read values array, calculates statistics immediately
    in NumPy, and returns only peak metadata. This avoids DataFrame overhead.

    Args:
        dss_file: Path to DSS file
        pathname: DSS pathname to read

    Returns:
        Dictionary with:
            - peak_flow: Maximum value (float)
            - peak_time: Time of maximum (datetime)
            - units: Engineering units (str)
            - count: Number of timesteps (int)
            - min_value: Minimum value (float)
            - mean_value: Mean value (float)
        Returns None if path doesn't exist or read fails

    Memory: ~200 bytes vs ~70 KB for full time series

    Example:
        >>> peak_info = DssCore.get_peak_value("results.dss", "//OUTLET/FLOW/.../")
        >>> print(f"Peak: {peak_info['peak_flow']} {peak_info['units']}")
    """
    # Configure JVM (must be before first jnius import)
    DssCore._configure_jvm()

    # Import Java classes via pyjnius (lazy)
    from jnius import autoclass, cast

    HecDss = autoclass('hec.heclib.dss.HecDss')
    TimeSeriesContainer = autoclass('hec.io.TimeSeriesContainer')

    dss_file = Path(dss_file)

    if not dss_file.exists():
        logger.warning(f"DSS file not found: {dss_file}")
        return None

    # Open DSS file
    dss = HecDss.open(str(dss_file.resolve()))

    # Suppress DSS library verbose output (ZREAD messages)
    # Note: DSS Fortran library writes directly to stdout and cannot be suppressed
    try:
        dss.setMessageLevel(0)  # 0 = quiet, 1 = errors only, 2 = warnings, 3 = verbose
    except Exception:
        pass  # Older DSS versions may not have this method

    try:
        # Read time series
        # True = ignore D-part (date) for wildcards
        container = dss.get(pathname, True)

        if container is None:
            logger.warning(f"No data found for pathname: {pathname}")
            return None

        # Cast to TimeSeriesContainer to access fields
        tsc = cast('hec.io.TimeSeriesContainer', container)

        # Extract values and times from Java container directly to NumPy
        # pyjnius automatically converts Java arrays to Python lists
        values = np.array(tsc.values)  # Java double[] -> numpy array
        times = np.array(tsc.times)    # Java int[] -> numpy array (minutes since 1899-12-31)

        # Validate that we got data
        if len(values) == 0 or len(times) == 0:
            logger.warning(f"No data found in time series for pathname: {pathname}")
            return None

        if len(values) != len(times):
            logger.warning(
                f"Mismatched array lengths: {len(values)} values, {len(times)} times"
            )
            return None

        # Calculate statistics immediately in NumPy - no DataFrame overhead
        peak_idx = int(np.argmax(values))
        peak_value = float(values[peak_idx])
        peak_time_minutes = int(times[peak_idx])

        # Convert peak time to datetime
        peak_time = DssCore._hec_time_to_datetime(peak_time_minutes)

        # Calculate additional statistics
        min_value = float(np.min(values))
        mean_value = float(np.mean(values))
        count = len(values)

        # Extract units from container
        units = str(tsc.units) if tsc.units else ""

        return {
            'peak_flow': peak_value,
            'peak_time': peak_time,
            'units': units,
            'count': count,
            'min_value': min_value,
            'mean_value': mean_value
        }

    except Exception as e:
        logger.error(f"Error reading DSS pathname {pathname}: {e}")
        return None

    finally:
        dss.done()

read_multiple_timeseries(dss_file, pathnames) staticmethod

Read multiple time series from DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
pathnames List[str]

List of DSS pathnames

required

Returns:

Type Description
Dict[str, DataFrame]

Dictionary mapping pathnames to DataFrames (None on failure)

Example

paths = ["/BASIN/LOC1/FLOW//1HOUR/OBS/", "/BASIN/LOC2/FLOW//1HOUR/OBS/"] data = DssCore.read_multiple_timeseries("file.dss", paths) for path, df in data.items(): if df is not None: print(f"{path}: {len(df)} points")

Source code in hms_commander/dss/core.py
@staticmethod
def read_multiple_timeseries(
    dss_file: Union[str, Path],
    pathnames: List[str]
) -> Dict[str, pd.DataFrame]:
    """
    Read multiple time series from DSS file.

    Args:
        dss_file: Path to DSS file
        pathnames: List of DSS pathnames

    Returns:
        Dictionary mapping pathnames to DataFrames (None on failure)

    Example:
        paths = ["/BASIN/LOC1/FLOW//1HOUR/OBS/", "/BASIN/LOC2/FLOW//1HOUR/OBS/"]
        data = DssCore.read_multiple_timeseries("file.dss", paths)
        for path, df in data.items():
            if df is not None:
                print(f"{path}: {len(df)} points")
    """
    results = {}
    for pathname in pathnames:
        try:
            results[pathname] = DssCore.read_timeseries(dss_file, pathname)
        except Exception as e:
            logger.warning(f"Could not read {pathname}: {e}")
            results[pathname] = None

    return results

get_info(dss_file) staticmethod

Get summary information about DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required

Returns:

Type Description
Dict[str, Any]

Dictionary with file information

Example

info = DssCore.get_info("sample.dss") print(f"Total paths: {info['total_paths']}") print(f"File size: {info['file_size_mb']:.2f} MB")

Source code in hms_commander/dss/core.py
@staticmethod
def get_info(dss_file: Union[str, Path]) -> Dict[str, Any]:
    """
    Get summary information about DSS file.

    Args:
        dss_file: Path to DSS file

    Returns:
        Dictionary with file information

    Example:
        info = DssCore.get_info("sample.dss")
        print(f"Total paths: {info['total_paths']}")
        print(f"File size: {info['file_size_mb']:.2f} MB")
    """
    dss_path = Path(dss_file)

    if not dss_path.exists():
        return {
            'filepath': str(dss_path),
            'exists': False,
            'error': 'File not found'
        }

    catalog = DssCore.get_catalog(dss_file)

    # Categorize paths by C-part (data type)
    path_types = {}
    for path in catalog:
        parts = path.split('/')
        if len(parts) >= 4:
            data_type = parts[3]  # C part
            path_types[data_type] = path_types.get(data_type, 0) + 1

    return {
        'filepath': str(dss_path.resolve()),
        'filename': dss_path.name,
        'exists': True,
        'file_size_mb': dss_path.stat().st_size / (1024 * 1024),
        'total_paths': len(catalog),
        'path_types': path_types,
        'first_5_paths': catalog[:5] if len(catalog) > 5 else catalog,
    }

parse_pathname(pathname) staticmethod

Parse a DSS pathname into its component parts.

DSS pathnames have format: /A/B/C/D/E/F/ - A: Basin/Project identifier - B: Location/Element name - C: Data type (FLOW, PRECIP, etc.) - D: Date/Time block - E: Time interval - F: Version/Run identifier

Parameters:

Name Type Description Default
pathname str

DSS pathname string

required

Returns:

Type Description
Dict[str, str]

Dictionary with pathname components

Example

parts = DssCore.parse_pathname("/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/") print(parts['B']) # 'OUTLET' print(parts['C']) # 'FLOW'

Source code in hms_commander/dss/core.py
@staticmethod
def parse_pathname(pathname: str) -> Dict[str, str]:
    """
    Parse a DSS pathname into its component parts.

    DSS pathnames have format: /A/B/C/D/E/F/
    - A: Basin/Project identifier
    - B: Location/Element name
    - C: Data type (FLOW, PRECIP, etc.)
    - D: Date/Time block
    - E: Time interval
    - F: Version/Run identifier

    Args:
        pathname: DSS pathname string

    Returns:
        Dictionary with pathname components

    Example:
        parts = DssCore.parse_pathname("/BASIN/OUTLET/FLOW//15MIN/RUN:RUN1/")
        print(parts['B'])  # 'OUTLET'
        print(parts['C'])  # 'FLOW'
    """
    return parse_pathname(pathname)

create_pathname(basin, element, data_type, interval, run_name='', date_block='') staticmethod

Create a DSS pathname from components.

Parameters:

Name Type Description Default
basin str

Basin/Project name (A part)

required
element str

Element name (B part)

required
data_type str

Data type like FLOW, PRECIP (C part)

required
interval str

Time interval like 15MIN, 1HOUR (E part)

required
run_name str

Run identifier (F part)

''
date_block str

Date block (D part, usually empty)

''

Returns:

Type Description
str

Formatted DSS pathname

Example

path = DssCore.create_pathname( "MYBASIN", "OUTLET", "FLOW", "15MIN", "RUN1" ) print(path) # '/MYBASIN/OUTLET/FLOW//15MIN/RUN:RUN1/'

Source code in hms_commander/dss/core.py
@staticmethod
def create_pathname(
    basin: str,
    element: str,
    data_type: str,
    interval: str,
    run_name: str = "",
    date_block: str = ""
) -> str:
    """
    Create a DSS pathname from components.

    Args:
        basin: Basin/Project name (A part)
        element: Element name (B part)
        data_type: Data type like FLOW, PRECIP (C part)
        interval: Time interval like 15MIN, 1HOUR (E part)
        run_name: Run identifier (F part)
        date_block: Date block (D part, usually empty)

    Returns:
        Formatted DSS pathname

    Example:
        path = DssCore.create_pathname(
            "MYBASIN", "OUTLET", "FLOW", "15MIN", "RUN1"
        )
        print(path)  # '/MYBASIN/OUTLET/FLOW//15MIN/RUN:RUN1/'
    """
    return create_pathname(basin, element, data_type, interval, run_name, date_block)

filter_catalog(catalog, pattern=None, data_type=None, element=None) staticmethod

Filter DSS catalog by pattern or components.

Parameters:

Name Type Description Default
catalog List[str]

List of DSS pathnames

required
pattern Optional[str]

Regex pattern to match against full pathname

None
data_type Optional[str]

Filter by C-part (e.g., "FLOW", "PRECIP")

None
element Optional[str]

Filter by B-part (element/location name)

None

Returns:

Type Description
List[str]

Filtered list of pathnames

Example

paths = DssCore.get_catalog("file.dss") flow_paths = DssCore.filter_catalog(paths, data_type="FLOW")

Source code in hms_commander/dss/core.py
@staticmethod
def filter_catalog(
    catalog: List[str],
    pattern: Optional[str] = None,
    data_type: Optional[str] = None,
    element: Optional[str] = None
) -> List[str]:
    """
    Filter DSS catalog by pattern or components.

    Args:
        catalog: List of DSS pathnames
        pattern: Regex pattern to match against full pathname
        data_type: Filter by C-part (e.g., "FLOW", "PRECIP")
        element: Filter by B-part (element/location name)

    Returns:
        Filtered list of pathnames

    Example:
        paths = DssCore.get_catalog("file.dss")
        flow_paths = DssCore.filter_catalog(paths, data_type="FLOW")
    """
    return filter_catalog(catalog, pattern, data_type, element)

write_paired_data(dss_file, pathname, x_values, y_values, x_units='HOURS', y_units='FRACTION', x_label='TIME', y_label='CUMULATIVE') staticmethod

Write paired data (X-Y curve) to DSS file.

This is used for Atlas 14 temporal distributions, rating curves, and other X-Y relationships.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file (created if doesn't exist)

required
pathname str

DSS pathname (e.g., "//TX_R3/FIRST-QUARTILE/24HR///50%/")

required
x_values ndarray

X coordinates (e.g., time in hours)

required
y_values ndarray

Y coordinates (e.g., cumulative fraction 0-1)

required
x_units str

Units for X values (default: "HOURS")

'HOURS'
y_units str

Units for Y values (default: "FRACTION")

'FRACTION'
x_label str

Label for X axis (default: "TIME")

'TIME'
y_label str

Label for Y axis (default: "CUMULATIVE")

'CUMULATIVE'

Returns:

Type Description
bool

True if write succeeded, False otherwise

Example

x = np.linspace(0, 24, 49) # 0 to 24 hours y = np.linspace(0, 1, 49) # 0 to 100% cumulative DssCore.write_paired_data( ... "temporal.dss", ... "//TX_R3/ALL-CASES/24HR///50%/", ... x, y ... )

Source code in hms_commander/dss/core.py
@staticmethod
def write_paired_data(
    dss_file: Union[str, Path],
    pathname: str,
    x_values: np.ndarray,
    y_values: np.ndarray,
    x_units: str = "HOURS",
    y_units: str = "FRACTION",
    x_label: str = "TIME",
    y_label: str = "CUMULATIVE"
) -> bool:
    """
    Write paired data (X-Y curve) to DSS file.

    This is used for Atlas 14 temporal distributions, rating curves,
    and other X-Y relationships.

    Args:
        dss_file: Path to DSS file (created if doesn't exist)
        pathname: DSS pathname (e.g., "//TX_R3/FIRST-QUARTILE/24HR///50%/")
        x_values: X coordinates (e.g., time in hours)
        y_values: Y coordinates (e.g., cumulative fraction 0-1)
        x_units: Units for X values (default: "HOURS")
        y_units: Units for Y values (default: "FRACTION")
        x_label: Label for X axis (default: "TIME")
        y_label: Label for Y axis (default: "CUMULATIVE")

    Returns:
        True if write succeeded, False otherwise

    Example:
        >>> x = np.linspace(0, 24, 49)  # 0 to 24 hours
        >>> y = np.linspace(0, 1, 49)   # 0 to 100% cumulative
        >>> DssCore.write_paired_data(
        ...     "temporal.dss",
        ...     "//TX_R3/ALL-CASES/24HR///50%/",
        ...     x, y
        ... )
    """
    # Configure JVM (must be before first jnius import)
    DssCore._configure_jvm()

    # Import Java classes via pyjnius (lazy)
    from jnius import autoclass

    HecDss = autoclass('hec.heclib.dss.HecDss')
    PairedDataContainer = autoclass('hec.io.PairedDataContainer')

    dss_file = str(Path(dss_file).resolve())

    # Open DSS file (creates if doesn't exist)
    dss = HecDss.open(dss_file)

    # Suppress DSS library verbose output
    try:
        dss.setMessageLevel(0)
    except Exception:
        pass

    try:
        # Create PairedDataContainer
        container = PairedDataContainer()

        # Use setter methods (pyjnius requires explicit setters, not property assignment)
        container.setFullName(pathname)
        container.setXUnits(x_units)
        container.setYUnits(y_units)
        container.setXType(x_label)
        container.setYType(y_label)

        # Convert numpy arrays to Java arrays
        # X ordinates: 1D array (double[])
        x_list = x_values.astype(np.float64).tolist()

        # Y ordinates: 2D array (double[numberCurves][numberOrdinates])
        # For single curve, wrap in outer list: [[y1, y2, ...]]
        # See: https://www.hec.usace.army.mil/confluence/dssdocs/dssjavaprogrammer/paired-data
        y_list = y_values.astype(np.float64).tolist()
        y_2d = [y_list]  # Single curve: yOrdinates[1][n]

        container.setXOrdinates(x_list)
        container.setYOrdinates(y_2d)
        container.setNumberOrdinates(len(x_values))
        container.setNumberCurves(1)

        # Write to DSS file
        dss.put(container)

        logger.info(f"Wrote paired data to {pathname}")
        return True

    except Exception as e:
        logger.error(f"Error writing paired data to {pathname}: {e}")
        return False

    finally:
        dss.done()

write_multiple_paired_data(dss_file, paired_data_records) staticmethod

Write multiple paired data records to DSS file.

More efficient than calling write_paired_data repeatedly as it keeps the DSS file open for all writes.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
paired_data_records List[Dict[str, Any]]

List of dicts with keys: - pathname: DSS pathname - x_values: numpy array of X values - y_values: numpy array of Y values - x_units: (optional) X units - y_units: (optional) Y units - x_label: (optional) X label - y_label: (optional) Y label

required

Returns:

Type Description
Dict[str, bool]

Dict mapping pathname to success status (True/False)

Example

records = [ ... {"pathname": "//A/B/C///D/", "x_values": x1, "y_values": y1}, ... {"pathname": "//A/E/C///D/", "x_values": x2, "y_values": y2}, ... ] results = DssCore.write_multiple_paired_data("file.dss", records)

Source code in hms_commander/dss/core.py
@staticmethod
def write_multiple_paired_data(
    dss_file: Union[str, Path],
    paired_data_records: List[Dict[str, Any]]
) -> Dict[str, bool]:
    """
    Write multiple paired data records to DSS file.

    More efficient than calling write_paired_data repeatedly as it
    keeps the DSS file open for all writes.

    Args:
        dss_file: Path to DSS file
        paired_data_records: List of dicts with keys:
            - pathname: DSS pathname
            - x_values: numpy array of X values
            - y_values: numpy array of Y values
            - x_units: (optional) X units
            - y_units: (optional) Y units
            - x_label: (optional) X label
            - y_label: (optional) Y label

    Returns:
        Dict mapping pathname to success status (True/False)

    Example:
        >>> records = [
        ...     {"pathname": "//A/B/C///D/", "x_values": x1, "y_values": y1},
        ...     {"pathname": "//A/E/C///D/", "x_values": x2, "y_values": y2},
        ... ]
        >>> results = DssCore.write_multiple_paired_data("file.dss", records)
    """
    # Configure JVM (must be before first jnius import)
    DssCore._configure_jvm()

    # Import Java classes via pyjnius (lazy)
    from jnius import autoclass

    HecDss = autoclass('hec.heclib.dss.HecDss')
    PairedDataContainer = autoclass('hec.io.PairedDataContainer')

    dss_file = str(Path(dss_file).resolve())
    results = {}

    # Open DSS file once for all writes
    dss = HecDss.open(dss_file)

    try:
        dss.setMessageLevel(0)
    except Exception:
        pass

    try:
        for record in paired_data_records:
            pathname = record['pathname']
            try:
                # Create container with setter methods
                container = PairedDataContainer()
                container.setFullName(pathname)
                container.setXUnits(record.get('x_units', 'HOURS'))
                container.setYUnits(record.get('y_units', 'FRACTION'))
                container.setXType(record.get('x_label', 'TIME'))
                container.setYType(record.get('y_label', 'CUMULATIVE'))

                x_values = record['x_values']
                y_values = record['y_values']

                # X ordinates: 1D array (double[])
                x_list = x_values.astype(np.float64).tolist()

                # Y ordinates: 2D array (double[numberCurves][numberOrdinates])
                y_list = y_values.astype(np.float64).tolist()
                y_2d = [y_list]  # Single curve: yOrdinates[1][n]

                container.setXOrdinates(x_list)
                container.setYOrdinates(y_2d)
                container.setNumberOrdinates(len(x_values))
                container.setNumberCurves(1)

                # Write to DSS
                dss.put(container)
                results[pathname] = True

            except Exception as e:
                logger.error(f"Error writing {pathname}: {e}")
                results[pathname] = False

        logger.info(f"Wrote {sum(results.values())}/{len(results)} paired data records")

    finally:
        dss.done()

    return results

read_paired_data(dss_file, pathname) staticmethod

Read paired data (X-Y curve) from DSS file.

Parameters:

Name Type Description Default
dss_file Union[str, Path]

Path to DSS file

required
pathname str

DSS pathname for paired data

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary with: - x_values: numpy array of X values - y_values: numpy array of Y values - x_units: X axis units - y_units: Y axis units - x_label: X axis label - y_label: Y axis label - pathname: Original pathname

Optional[Dict[str, Any]]

Returns None if read fails

Example

data = DssCore.read_paired_data( ... "file.dss", ... "//ELEMENT/FLOW-DIVERSION///TABLE/" ... ) print(f"X: {data['x_values']}") print(f"Y: {data['y_values']}")

Source code in hms_commander/dss/core.py
@staticmethod
def read_paired_data(
    dss_file: Union[str, Path],
    pathname: str
) -> Optional[Dict[str, Any]]:
    """
    Read paired data (X-Y curve) from DSS file.

    Args:
        dss_file: Path to DSS file
        pathname: DSS pathname for paired data

    Returns:
        Dictionary with:
            - x_values: numpy array of X values
            - y_values: numpy array of Y values
            - x_units: X axis units
            - y_units: Y axis units
            - x_label: X axis label
            - y_label: Y axis label
            - pathname: Original pathname
        Returns None if read fails

    Example:
        >>> data = DssCore.read_paired_data(
        ...     "file.dss",
        ...     "//ELEMENT/FLOW-DIVERSION///TABLE/"
        ... )
        >>> print(f"X: {data['x_values']}")
        >>> print(f"Y: {data['y_values']}")
    """
    # Configure JVM (must be before first jnius import)
    DssCore._configure_jvm()

    # Import Java classes via pyjnius (lazy)
    from jnius import autoclass, cast

    HecDss = autoclass('hec.heclib.dss.HecDss')
    PairedDataContainer = autoclass('hec.io.PairedDataContainer')

    dss_file = Path(dss_file)

    if not dss_file.exists():
        logger.warning(f"DSS file not found: {dss_file}")
        return None

    # Open DSS file
    dss = HecDss.open(str(dss_file.resolve()))

    # Suppress DSS library verbose output
    try:
        dss.setMessageLevel(0)
    except Exception:
        pass

    try:
        # Read paired data
        # True = ignore D-part for wildcard matching
        container = dss.get(pathname, True)

        if container is None:
            logger.warning(f"No data found for pathname: {pathname}")
            return None

        # Cast to PairedDataContainer to access fields
        pdc = cast('hec.io.PairedDataContainer', container)

        # Extract X ordinates (1D array)
        x_values = np.array(pdc.xOrdinates)

        # Extract Y ordinates (2D array - [curves][ordinates])
        # Most paired data has single curve, so take first curve
        y_ordinates = pdc.yOrdinates
        if y_ordinates is not None:
            # Convert Java 2D array to numpy
            # y_ordinates[0] is the first (usually only) curve
            y_values = np.array(y_ordinates[0])
        else:
            logger.warning(f"No Y ordinates found for pathname: {pathname}")
            return None

        # Extract metadata
        x_units = str(pdc.xUnits) if pdc.xUnits else ""
        y_units = str(pdc.yUnits) if pdc.yUnits else ""
        x_label = str(pdc.xType) if pdc.xType else ""
        y_label = str(pdc.yType) if pdc.yType else ""

        return {
            'x_values': x_values,
            'y_values': y_values,
            'x_units': x_units,
            'y_units': y_units,
            'x_label': x_label,
            'y_label': y_label,
            'pathname': pathname
        }

    except Exception as e:
        logger.error(f"Error reading paired data from {pathname}: {e}")
        return None

    finally:
        dss.done()

shutdown_jvm() staticmethod

Shutdown Java Virtual Machine.

Note: With pyjnius, JVM shutdown is typically not needed. This is a placeholder for API compatibility.

Source code in hms_commander/dss/core.py
@staticmethod
def shutdown_jvm():
    """
    Shutdown Java Virtual Machine.

    Note: With pyjnius, JVM shutdown is typically not needed.
    This is a placeholder for API compatibility.
    """
    logger.info("pyjnius handles JVM lifecycle automatically")
    pass
CLB Engineering Corporation  ·  LLM Forward Engineering
HMS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.