Skip to content

HmsOutput

HEC-HMS compute-output and log parsing helpers.

hms_commander.HmsOutput

HmsOutput - HEC-HMS Compute Output Parsing and Analysis

This module provides static methods for parsing HEC-HMS compute output, log files, and execution results to identify errors, warnings, and execution status.

All methods are static and designed to be used without instantiation.

HmsMessage dataclass

Represents a single HMS log/output message.

Source code in hms_commander/HmsOutput.py
@dataclass
class HmsMessage:
    """Represents a single HMS log/output message."""
    type: str  # NOTE, WARNING, ERROR
    code: int  # Message code (e.g., 10008, 42720)
    message: str  # Message text
    timestamp: Optional[datetime] = None
    raw_line: Optional[str] = None

ComputeResult dataclass

Results from an HMS compute operation.

Source code in hms_commander/HmsOutput.py
@dataclass
class ComputeResult:
    """Results from an HMS compute operation."""
    success: bool
    run_name: Optional[str]
    project_name: Optional[str]
    hms_version: Optional[str]
    start_time: Optional[datetime]
    end_time: Optional[datetime]
    exit_code: int
    notes: List[HmsMessage]
    warnings: List[HmsMessage]
    errors: List[HmsMessage]
    stdout: str
    stderr: str

HmsOutput

Parse and analyze HEC-HMS compute output and log files.

Provides methods for extracting messages, identifying errors, and assessing computation success/failure.

All methods are static - no instantiation required.

Example

from hms_commander import HmsOutput result = HmsOutput.parse_compute_output(stdout, stderr) if not result.success: ... for err in result.errors: ... print(f"ERROR {err.code}: {err.message}")

Source code in hms_commander/HmsOutput.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
class HmsOutput:
    """
    Parse and analyze HEC-HMS compute output and log files.

    Provides methods for extracting messages, identifying errors,
    and assessing computation success/failure.

    All methods are static - no instantiation required.

    Example:
        >>> from hms_commander import HmsOutput
        >>> result = HmsOutput.parse_compute_output(stdout, stderr)
        >>> if not result.success:
        ...     for err in result.errors:
        ...         print(f"ERROR {err.code}: {err.message}")
    """

    # Regex patterns for parsing HMS output
    NOTE_PATTERN = re.compile(r'^NOTE\s+(\d+):\s+(.+)$', re.MULTILINE)
    WARNING_PATTERN = re.compile(r'^WARNING\s+(\d+):\s+(.+)$', re.MULTILINE)
    ERROR_PATTERN = re.compile(r'^ERROR\s+(\d+):\s+(.+)$', re.MULTILINE)

    # Banner patterns
    HMS_START_PATTERN = re.compile(r'^Begin HEC-HMS\s+([\d.]+)\s+(.+)$', re.MULTILINE)
    HMS_END_PATTERN = re.compile(r'^End HEC-HMS\s+([\d.]+)\s+.+;\s+Exit status\s*=\s*(\d+)$', re.MULTILINE)

    # Script patterns
    SCRIPT_START_PATTERN = re.compile(r'NOTE 14650:\s+Run Script:\s+"([^"]+)"')
    SCRIPT_END_PATTERN = re.compile(r'NOTE 12573:\s+End script\s+"[^"]+";\s+Exit code\s+(\d+)')

    # Project patterns
    PROJECT_OPENED_PATTERN = re.compile(
        r'NOTE 10008:\s+Finished opening project "([^"]+)" in directory "([^"]+)" at time (.+)\.'
    )

    # Compute patterns
    COMPUTE_BEGIN_PATTERN = re.compile(
        r'NOTE 10184:\s+Began computing simulation run "([^"]+)" at time (.+)\.'
    )
    COMPUTE_END_PATTERN = re.compile(
        r'NOTE 10185:\s+Finished computing simulation run "([^"]+)" at time (.+)\.'
    )

    # Common message codes
    MESSAGE_CODES = {
        # Informational - Project Operations
        10008: "Project opened (HMS 3.x)",
        10019: "Project opened (HMS 4.x)",
        10179: "Basin model opened",
        10180: "Meteorologic model opened",
        10181: "Control specifications opened",

        # Informational - Compute Operations (HMS 3.x)
        10184: "Compute begin (HMS 3.x)",
        10185: "Compute finished (HMS 3.x)",

        # Informational - Compute Operations (HMS 4.x)
        15301: "Compute begin (HMS 4.x)",
        15302: "Compute finished (HMS 4.x)",
        15312: "Compute runtime summary",

        # Informational - Script Operations
        12573: "Script ended",
        14400: "Background map file not found",
        14650: "Script started",

        # Informational - Model Validation
        20364: "No meteorologic model parameter problems",
        40049: "No basin model parameter problems",
        42413: "Unit hydrograph volume computed",

        # Warnings - Version Upgrade
        10020: "Begin updating project to new version",
        10021: "Project updated to new version",

        # Warnings - Missing Files
        42720: "Basin map file missing",

        # Errors
        10000: "Unknown exception or error",
        10018: "Project file write permission denied",
    }

    # HMS 4.x specific patterns
    PROJECT_OPENED_4X_PATTERN = re.compile(
        r'NOTE 10019:\s+Finished opening project "([^"]+)" in directory "([^"]+)" at time (.+)\.'
    )

    # HMS 4.x compute patterns
    COMPUTE_BEGIN_4X_PATTERN = re.compile(
        r'NOTE 15301:\s+Began computing simulation run "([^"]+)" at time (.+)\.'
    )
    COMPUTE_END_4X_PATTERN = re.compile(
        r'NOTE 15302:\s+Finished computing simulation run "([^"]+)" at time (.+)\.'
    )

    # Version upgrade patterns
    VERSION_UPGRADE_PATTERN = re.compile(
        r'WARNING 10021:\s+Project "([^"]+)" was updated from Version ([\d.]+) to Version ([\d.]+)'
    )

    @staticmethod
    @log_call
    def parse_compute_output(
        stdout: str,
        stderr: str = ""
    ) -> ComputeResult:
        """
        Parse HEC-HMS compute output into structured result.

        Args:
            stdout: Standard output from HMS execution
            stderr: Standard error from HMS execution

        Returns:
            ComputeResult with parsed messages and status

        Example:
            >>> result = HmsOutput.parse_compute_output(stdout, stderr)
            >>> print(f"Success: {result.success}")
            >>> print(f"Warnings: {len(result.warnings)}")
        """
        notes = []
        warnings = []
        errors = []

        # Parse NOTEs
        for match in HmsOutput.NOTE_PATTERN.finditer(stdout):
            code = int(match.group(1))
            message = match.group(2).strip()
            notes.append(HmsMessage(
                type="NOTE",
                code=code,
                message=message,
                raw_line=match.group(0)
            ))

        # Parse WARNINGs
        for match in HmsOutput.WARNING_PATTERN.finditer(stdout):
            code = int(match.group(1))
            message = match.group(2).strip()
            warnings.append(HmsMessage(
                type="WARNING",
                code=code,
                message=message,
                raw_line=match.group(0)
            ))

        # Parse ERRORs
        for match in HmsOutput.ERROR_PATTERN.finditer(stdout):
            code = int(match.group(1))
            message = match.group(2).strip()
            errors.append(HmsMessage(
                type="ERROR",
                code=code,
                message=message,
                raw_line=match.group(0)
            ))

        # Also check stderr for errors
        for match in HmsOutput.ERROR_PATTERN.finditer(stderr):
            code = int(match.group(1))
            message = match.group(2).strip()
            errors.append(HmsMessage(
                type="ERROR",
                code=code,
                message=message,
                raw_line=match.group(0)
            ))

        # Extract HMS version
        hms_version = None
        start_match = HmsOutput.HMS_START_PATTERN.search(stdout)
        if start_match:
            hms_version = start_match.group(1)

        # Extract exit code
        exit_code = -1
        end_match = HmsOutput.HMS_END_PATTERN.search(stdout)
        if end_match:
            exit_code = int(end_match.group(2))

        # Also check for script exit code
        script_end_match = HmsOutput.SCRIPT_END_PATTERN.search(stdout)
        if script_end_match and exit_code == -1:
            exit_code = int(script_end_match.group(1))

        # Extract project name (try HMS 4.x pattern first, then 3.x)
        project_name = None
        project_match = HmsOutput.PROJECT_OPENED_4X_PATTERN.search(stdout)
        if project_match:
            project_name = project_match.group(1)
        else:
            project_match = HmsOutput.PROJECT_OPENED_PATTERN.search(stdout)
            if project_match:
                project_name = project_match.group(1)

        # Extract run name (try HMS 4.x pattern first, then 3.x)
        run_name = None
        compute_match = HmsOutput.COMPUTE_BEGIN_4X_PATTERN.search(stdout)
        if compute_match:
            run_name = compute_match.group(1)
        else:
            compute_match = HmsOutput.COMPUTE_BEGIN_PATTERN.search(stdout)
            if compute_match:
                run_name = compute_match.group(1)

        # Determine success
        # Check for HMS 3.x (10185) or HMS 4.x (15302) completion notes
        computation_finished = any(
            n.code in (10185, 15302) for n in notes
        )
        # Script success if exit code is 0, or if parsing a log file (exit code unavailable = -1)
        script_success = exit_code == 0 or exit_code == -1
        no_errors = len(errors) == 0

        # Success requires: computation finished AND no errors
        # (script_success is only a factor when exit code is explicitly non-zero)
        success = computation_finished and no_errors and (exit_code != 1)

        return ComputeResult(
            success=success,
            run_name=run_name,
            project_name=project_name,
            hms_version=hms_version,
            start_time=None,  # Could parse from timestamps
            end_time=None,
            exit_code=exit_code,
            notes=notes,
            warnings=warnings,
            errors=errors,
            stdout=stdout,
            stderr=stderr
        )

    @staticmethod
    @log_call
    def parse_log_file(
        log_path: Union[str, Path]
    ) -> ComputeResult:
        """
        Parse an HMS log file (.log) into structured result.

        Args:
            log_path: Path to the log file

        Returns:
            ComputeResult with parsed messages

        Example:
            >>> result = HmsOutput.parse_log_file("Run_1.log")
            >>> for note in result.notes:
            ...     print(f"NOTE {note.code}: {note.message}")
        """
        log_path = Path(log_path)

        if not log_path.exists():
            raise FileNotFoundError(f"Log file not found: {log_path}")

        content = log_path.read_text(encoding='utf-8', errors='replace')

        return HmsOutput.parse_compute_output(content, "")

    @staticmethod
    @log_call
    def get_errors(
        stdout: str,
        stderr: str = ""
    ) -> List[HmsMessage]:
        """
        Extract only ERROR messages from compute output.

        Args:
            stdout: Standard output from HMS execution
            stderr: Standard error from HMS execution

        Returns:
            List of error messages
        """
        result = HmsOutput.parse_compute_output(stdout, stderr)
        return result.errors

    @staticmethod
    @log_call
    def get_warnings(
        stdout: str,
        stderr: str = ""
    ) -> List[HmsMessage]:
        """
        Extract only WARNING messages from compute output.

        Args:
            stdout: Standard output from HMS execution
            stderr: Standard error from HMS execution

        Returns:
            List of warning messages
        """
        result = HmsOutput.parse_compute_output(stdout, stderr)
        return result.warnings

    @staticmethod
    @log_call
    def has_fatal_errors(
        stdout: str,
        stderr: str = ""
    ) -> bool:
        """
        Check if output contains fatal errors that prevented computation.

        Args:
            stdout: Standard output from HMS execution
            stderr: Standard error from HMS execution

        Returns:
            True if fatal errors found
        """
        result = HmsOutput.parse_compute_output(stdout, stderr)

        # Check for explicit errors
        if result.errors:
            return True

        # Check for specific failure indicators
        failure_indicators = [
            "Exit status = 1",
            "Exit code 1",
            "Error opening project",
            "Error during computation",
        ]

        combined = stdout + stderr
        for indicator in failure_indicators:
            if indicator in combined:
                return True

        return False

    @staticmethod
    @log_call
    def format_summary(
        result: ComputeResult,
        include_notes: bool = False
    ) -> str:
        """
        Format compute result as human-readable summary.

        Args:
            result: ComputeResult from parse_compute_output
            include_notes: Whether to include NOTE messages

        Returns:
            Formatted summary string

        Example:
            >>> result = HmsOutput.parse_compute_output(stdout, stderr)
            >>> print(HmsOutput.format_summary(result))
        """
        lines = []
        lines.append("=" * 60)
        lines.append("HMS COMPUTE SUMMARY")
        lines.append("=" * 60)

        # Status
        status = "SUCCESS" if result.success else "FAILED"
        lines.append(f"Status: {status}")

        if result.hms_version:
            lines.append(f"HMS Version: {result.hms_version}")
        if result.project_name:
            lines.append(f"Project: {result.project_name}")
        if result.run_name:
            lines.append(f"Run: {result.run_name}")

        lines.append(f"Exit Code: {result.exit_code}")

        # Errors
        if result.errors:
            lines.append("")
            lines.append(f"ERRORS ({len(result.errors)}):")
            lines.append("-" * 40)
            for err in result.errors:
                lines.append(f"  [{err.code}] {err.message}")

        # Warnings
        if result.warnings:
            lines.append("")
            lines.append(f"WARNINGS ({len(result.warnings)}):")
            lines.append("-" * 40)
            for warn in result.warnings:
                lines.append(f"  [{warn.code}] {warn.message}")

        # Notes (optional)
        if include_notes and result.notes:
            lines.append("")
            lines.append(f"NOTES ({len(result.notes)}):")
            lines.append("-" * 40)
            for note in result.notes:
                lines.append(f"  [{note.code}] {note.message}")

        lines.append("=" * 60)

        return "\n".join(lines)

    @staticmethod
    @log_call
    def compare_runs(
        result1: ComputeResult,
        result2: ComputeResult,
        label1: str = "Run 1",
        label2: str = "Run 2"
    ) -> str:
        """
        Compare two compute results and highlight differences.

        Args:
            result1: First ComputeResult
            result2: Second ComputeResult
            label1: Label for first result
            label2: Label for second result

        Returns:
            Formatted comparison string
        """
        lines = []
        lines.append("=" * 60)
        lines.append("HMS COMPUTE COMPARISON")
        lines.append("=" * 60)

        # Status comparison
        lines.append(f"{label1}: {'SUCCESS' if result1.success else 'FAILED'}")
        lines.append(f"{label2}: {'SUCCESS' if result2.success else 'FAILED'}")

        # Error comparison
        lines.append("")
        lines.append(f"Errors: {label1}={len(result1.errors)}, {label2}={len(result2.errors)}")
        lines.append(f"Warnings: {label1}={len(result1.warnings)}, {label2}={len(result2.warnings)}")

        # New errors in result2
        error_codes_1 = {e.code for e in result1.errors}
        error_codes_2 = {e.code for e in result2.errors}

        new_errors = error_codes_2 - error_codes_1
        if new_errors:
            lines.append("")
            lines.append(f"NEW ERRORS in {label2}:")
            for err in result2.errors:
                if err.code in new_errors:
                    lines.append(f"  [{err.code}] {err.message}")

        resolved_errors = error_codes_1 - error_codes_2
        if resolved_errors:
            lines.append("")
            lines.append(f"RESOLVED ERRORS in {label2}:")
            for err in result1.errors:
                if err.code in resolved_errors:
                    lines.append(f"  [{err.code}] {err.message}")

        # New warnings
        warn_codes_1 = {w.code for w in result1.warnings}
        warn_codes_2 = {w.code for w in result2.warnings}

        new_warnings = warn_codes_2 - warn_codes_1
        if new_warnings:
            lines.append("")
            lines.append(f"NEW WARNINGS in {label2}:")
            for warn in result2.warnings:
                if warn.code in new_warnings:
                    lines.append(f"  [{warn.code}] {warn.message}")

        lines.append("=" * 60)

        return "\n".join(lines)

    @staticmethod
    def get_message_description(code: int) -> str:
        """
        Get human-readable description for an HMS message code.

        Args:
            code: HMS message code

        Returns:
            Description string or "Unknown message code"
        """
        return HmsOutput.MESSAGE_CODES.get(code, "Unknown message code")

    @staticmethod
    @log_call
    def is_version_upgrade_error(
        result: ComputeResult
    ) -> Tuple[bool, List[str]]:
        """
        Check if errors are likely due to version upgrade issues.

        Args:
            result: ComputeResult from parse_compute_output

        Returns:
            Tuple of (is_upgrade_error, list of potential issues)

        Example:
            >>> is_upgrade, issues = HmsOutput.is_version_upgrade_error(result)
            >>> if is_upgrade:
            ...     for issue in issues:
            ...         print(f"Upgrade issue: {issue}")
        """
        issues = []

        # Common upgrade-related error patterns
        upgrade_patterns = [
            (r'DSS file.*version', "DSS file version incompatibility"),
            (r'Unknown.*method', "Unknown method - may need parameter update"),
            (r'deprecated', "Deprecated feature"),
            (r'not.*supported', "Feature not supported in this version"),
            (r'cannot.*open.*project', "Project file format incompatibility"),
            (r'unable.*load', "Unable to load component - format change"),
        ]

        combined_text = result.stdout.lower() + result.stderr.lower()
        for error in result.errors:
            combined_text += error.message.lower()

        for pattern, description in upgrade_patterns:
            if re.search(pattern, combined_text, re.IGNORECASE):
                issues.append(description)

        is_upgrade = len(issues) > 0

        return is_upgrade, issues

    @staticmethod
    @log_call
    def parse_project_log(
        project_path: Union[str, Path],
        project_name: Optional[str] = None
    ) -> ComputeResult:
        """
        Parse the project log file from a project directory.

        Args:
            project_path: Path to the project directory
            project_name: Project name (auto-detected if not provided)

        Returns:
            ComputeResult from the project log file

        Example:
            >>> result = HmsOutput.parse_project_log("C:/Projects/MyProject")
            >>> print(HmsOutput.format_summary(result))
        """
        project_path = Path(project_path)

        # Auto-detect project name from .hms file if not provided
        if project_name is None:
            hms_files = list(project_path.glob("*.hms"))
            if hms_files:
                project_name = hms_files[0].stem
            else:
                raise FileNotFoundError(f"No .hms file found in {project_path}")

        log_file = project_path / f"{project_name}.log"
        if not log_file.exists():
            raise FileNotFoundError(f"Project log file not found: {log_file}")

        return HmsOutput.parse_log_file(log_file)

    @staticmethod
    @log_call
    def parse_run_log(
        project_path: Union[str, Path],
        run_name: str
    ) -> ComputeResult:
        """
        Parse a specific run log file.

        Args:
            project_path: Path to the project directory
            run_name: Name of the run

        Returns:
            ComputeResult from the run log file

        Example:
            >>> result = HmsOutput.parse_run_log("C:/Projects/MyProject", "Run 1")
            >>> if result.success:
            ...     print("Run completed successfully")
        """
        project_path = Path(project_path)

        # Try common log file naming patterns
        log_patterns = [
            f"{run_name.replace(' ', '_')}.log",
            f"{run_name}.log",
            f"Run_{run_name.split()[-1]}.log" if run_name.startswith("Run ") else None,
        ]

        for pattern in log_patterns:
            if pattern:
                log_file = project_path / pattern
                if log_file.exists():
                    return HmsOutput.parse_log_file(log_file)

        raise FileNotFoundError(f"Run log file not found for '{run_name}' in {project_path}")

    @staticmethod
    @log_call
    def check_version_upgrade(
        log_content: str
    ) -> Tuple[bool, Optional[str], Optional[str]]:
        """
        Check if a version upgrade occurred during project open.

        Args:
            log_content: Content of the project log file

        Returns:
            Tuple of (upgrade_occurred, from_version, to_version)

        Example:
            >>> upgraded, from_ver, to_ver = HmsOutput.check_version_upgrade(log)
            >>> if upgraded:
            ...     print(f"Project upgraded from {from_ver} to {to_ver}")
        """
        match = HmsOutput.VERSION_UPGRADE_PATTERN.search(log_content)
        if match:
            return True, match.group(2), match.group(3)
        return False, None, None

    @staticmethod
    def get_project_name_from_hms(
        hms_file_path: Union[str, Path]
    ) -> str:
        """
        Extract the project name from an .hms file.

        The project name is on the first line: "Project: <name>"

        Args:
            hms_file_path: Path to the .hms file

        Returns:
            Project name string

        Example:
            >>> name = HmsOutput.get_project_name_from_hms("C:/Projects/tifton/tifton.hms")
            >>> print(name)  # "tifton"
        """
        hms_file_path = Path(hms_file_path)

        if not hms_file_path.exists():
            raise FileNotFoundError(f".hms file not found: {hms_file_path}")

        content = hms_file_path.read_text(encoding='utf-8', errors='replace')

        # Look for "Project: <name>" pattern
        match = re.search(r'^Project:\s*(.+)$', content, re.MULTILINE)
        if match:
            return match.group(1).strip()

        raise ValueError(f"Could not find project name in {hms_file_path}")

parse_compute_output(stdout, stderr='') staticmethod

Parse HEC-HMS compute output into structured result.

Parameters:

Name Type Description Default
stdout str

Standard output from HMS execution

required
stderr str

Standard error from HMS execution

''

Returns:

Type Description
ComputeResult

ComputeResult with parsed messages and status

Example

result = HmsOutput.parse_compute_output(stdout, stderr) print(f"Success: {result.success}") print(f"Warnings: {len(result.warnings)}")

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def parse_compute_output(
    stdout: str,
    stderr: str = ""
) -> ComputeResult:
    """
    Parse HEC-HMS compute output into structured result.

    Args:
        stdout: Standard output from HMS execution
        stderr: Standard error from HMS execution

    Returns:
        ComputeResult with parsed messages and status

    Example:
        >>> result = HmsOutput.parse_compute_output(stdout, stderr)
        >>> print(f"Success: {result.success}")
        >>> print(f"Warnings: {len(result.warnings)}")
    """
    notes = []
    warnings = []
    errors = []

    # Parse NOTEs
    for match in HmsOutput.NOTE_PATTERN.finditer(stdout):
        code = int(match.group(1))
        message = match.group(2).strip()
        notes.append(HmsMessage(
            type="NOTE",
            code=code,
            message=message,
            raw_line=match.group(0)
        ))

    # Parse WARNINGs
    for match in HmsOutput.WARNING_PATTERN.finditer(stdout):
        code = int(match.group(1))
        message = match.group(2).strip()
        warnings.append(HmsMessage(
            type="WARNING",
            code=code,
            message=message,
            raw_line=match.group(0)
        ))

    # Parse ERRORs
    for match in HmsOutput.ERROR_PATTERN.finditer(stdout):
        code = int(match.group(1))
        message = match.group(2).strip()
        errors.append(HmsMessage(
            type="ERROR",
            code=code,
            message=message,
            raw_line=match.group(0)
        ))

    # Also check stderr for errors
    for match in HmsOutput.ERROR_PATTERN.finditer(stderr):
        code = int(match.group(1))
        message = match.group(2).strip()
        errors.append(HmsMessage(
            type="ERROR",
            code=code,
            message=message,
            raw_line=match.group(0)
        ))

    # Extract HMS version
    hms_version = None
    start_match = HmsOutput.HMS_START_PATTERN.search(stdout)
    if start_match:
        hms_version = start_match.group(1)

    # Extract exit code
    exit_code = -1
    end_match = HmsOutput.HMS_END_PATTERN.search(stdout)
    if end_match:
        exit_code = int(end_match.group(2))

    # Also check for script exit code
    script_end_match = HmsOutput.SCRIPT_END_PATTERN.search(stdout)
    if script_end_match and exit_code == -1:
        exit_code = int(script_end_match.group(1))

    # Extract project name (try HMS 4.x pattern first, then 3.x)
    project_name = None
    project_match = HmsOutput.PROJECT_OPENED_4X_PATTERN.search(stdout)
    if project_match:
        project_name = project_match.group(1)
    else:
        project_match = HmsOutput.PROJECT_OPENED_PATTERN.search(stdout)
        if project_match:
            project_name = project_match.group(1)

    # Extract run name (try HMS 4.x pattern first, then 3.x)
    run_name = None
    compute_match = HmsOutput.COMPUTE_BEGIN_4X_PATTERN.search(stdout)
    if compute_match:
        run_name = compute_match.group(1)
    else:
        compute_match = HmsOutput.COMPUTE_BEGIN_PATTERN.search(stdout)
        if compute_match:
            run_name = compute_match.group(1)

    # Determine success
    # Check for HMS 3.x (10185) or HMS 4.x (15302) completion notes
    computation_finished = any(
        n.code in (10185, 15302) for n in notes
    )
    # Script success if exit code is 0, or if parsing a log file (exit code unavailable = -1)
    script_success = exit_code == 0 or exit_code == -1
    no_errors = len(errors) == 0

    # Success requires: computation finished AND no errors
    # (script_success is only a factor when exit code is explicitly non-zero)
    success = computation_finished and no_errors and (exit_code != 1)

    return ComputeResult(
        success=success,
        run_name=run_name,
        project_name=project_name,
        hms_version=hms_version,
        start_time=None,  # Could parse from timestamps
        end_time=None,
        exit_code=exit_code,
        notes=notes,
        warnings=warnings,
        errors=errors,
        stdout=stdout,
        stderr=stderr
    )

parse_log_file(log_path) staticmethod

Parse an HMS log file (.log) into structured result.

Parameters:

Name Type Description Default
log_path Union[str, Path]

Path to the log file

required

Returns:

Type Description
ComputeResult

ComputeResult with parsed messages

Example

result = HmsOutput.parse_log_file("Run_1.log") for note in result.notes: ... print(f"NOTE {note.code}: {note.message}")

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def parse_log_file(
    log_path: Union[str, Path]
) -> ComputeResult:
    """
    Parse an HMS log file (.log) into structured result.

    Args:
        log_path: Path to the log file

    Returns:
        ComputeResult with parsed messages

    Example:
        >>> result = HmsOutput.parse_log_file("Run_1.log")
        >>> for note in result.notes:
        ...     print(f"NOTE {note.code}: {note.message}")
    """
    log_path = Path(log_path)

    if not log_path.exists():
        raise FileNotFoundError(f"Log file not found: {log_path}")

    content = log_path.read_text(encoding='utf-8', errors='replace')

    return HmsOutput.parse_compute_output(content, "")

get_errors(stdout, stderr='') staticmethod

Extract only ERROR messages from compute output.

Parameters:

Name Type Description Default
stdout str

Standard output from HMS execution

required
stderr str

Standard error from HMS execution

''

Returns:

Type Description
List[HmsMessage]

List of error messages

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def get_errors(
    stdout: str,
    stderr: str = ""
) -> List[HmsMessage]:
    """
    Extract only ERROR messages from compute output.

    Args:
        stdout: Standard output from HMS execution
        stderr: Standard error from HMS execution

    Returns:
        List of error messages
    """
    result = HmsOutput.parse_compute_output(stdout, stderr)
    return result.errors

get_warnings(stdout, stderr='') staticmethod

Extract only WARNING messages from compute output.

Parameters:

Name Type Description Default
stdout str

Standard output from HMS execution

required
stderr str

Standard error from HMS execution

''

Returns:

Type Description
List[HmsMessage]

List of warning messages

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def get_warnings(
    stdout: str,
    stderr: str = ""
) -> List[HmsMessage]:
    """
    Extract only WARNING messages from compute output.

    Args:
        stdout: Standard output from HMS execution
        stderr: Standard error from HMS execution

    Returns:
        List of warning messages
    """
    result = HmsOutput.parse_compute_output(stdout, stderr)
    return result.warnings

has_fatal_errors(stdout, stderr='') staticmethod

Check if output contains fatal errors that prevented computation.

Parameters:

Name Type Description Default
stdout str

Standard output from HMS execution

required
stderr str

Standard error from HMS execution

''

Returns:

Type Description
bool

True if fatal errors found

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def has_fatal_errors(
    stdout: str,
    stderr: str = ""
) -> bool:
    """
    Check if output contains fatal errors that prevented computation.

    Args:
        stdout: Standard output from HMS execution
        stderr: Standard error from HMS execution

    Returns:
        True if fatal errors found
    """
    result = HmsOutput.parse_compute_output(stdout, stderr)

    # Check for explicit errors
    if result.errors:
        return True

    # Check for specific failure indicators
    failure_indicators = [
        "Exit status = 1",
        "Exit code 1",
        "Error opening project",
        "Error during computation",
    ]

    combined = stdout + stderr
    for indicator in failure_indicators:
        if indicator in combined:
            return True

    return False

format_summary(result, include_notes=False) staticmethod

Format compute result as human-readable summary.

Parameters:

Name Type Description Default
result ComputeResult

ComputeResult from parse_compute_output

required
include_notes bool

Whether to include NOTE messages

False

Returns:

Type Description
str

Formatted summary string

Example

result = HmsOutput.parse_compute_output(stdout, stderr) print(HmsOutput.format_summary(result))

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def format_summary(
    result: ComputeResult,
    include_notes: bool = False
) -> str:
    """
    Format compute result as human-readable summary.

    Args:
        result: ComputeResult from parse_compute_output
        include_notes: Whether to include NOTE messages

    Returns:
        Formatted summary string

    Example:
        >>> result = HmsOutput.parse_compute_output(stdout, stderr)
        >>> print(HmsOutput.format_summary(result))
    """
    lines = []
    lines.append("=" * 60)
    lines.append("HMS COMPUTE SUMMARY")
    lines.append("=" * 60)

    # Status
    status = "SUCCESS" if result.success else "FAILED"
    lines.append(f"Status: {status}")

    if result.hms_version:
        lines.append(f"HMS Version: {result.hms_version}")
    if result.project_name:
        lines.append(f"Project: {result.project_name}")
    if result.run_name:
        lines.append(f"Run: {result.run_name}")

    lines.append(f"Exit Code: {result.exit_code}")

    # Errors
    if result.errors:
        lines.append("")
        lines.append(f"ERRORS ({len(result.errors)}):")
        lines.append("-" * 40)
        for err in result.errors:
            lines.append(f"  [{err.code}] {err.message}")

    # Warnings
    if result.warnings:
        lines.append("")
        lines.append(f"WARNINGS ({len(result.warnings)}):")
        lines.append("-" * 40)
        for warn in result.warnings:
            lines.append(f"  [{warn.code}] {warn.message}")

    # Notes (optional)
    if include_notes and result.notes:
        lines.append("")
        lines.append(f"NOTES ({len(result.notes)}):")
        lines.append("-" * 40)
        for note in result.notes:
            lines.append(f"  [{note.code}] {note.message}")

    lines.append("=" * 60)

    return "\n".join(lines)

compare_runs(result1, result2, label1='Run 1', label2='Run 2') staticmethod

Compare two compute results and highlight differences.

Parameters:

Name Type Description Default
result1 ComputeResult

First ComputeResult

required
result2 ComputeResult

Second ComputeResult

required
label1 str

Label for first result

'Run 1'
label2 str

Label for second result

'Run 2'

Returns:

Type Description
str

Formatted comparison string

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def compare_runs(
    result1: ComputeResult,
    result2: ComputeResult,
    label1: str = "Run 1",
    label2: str = "Run 2"
) -> str:
    """
    Compare two compute results and highlight differences.

    Args:
        result1: First ComputeResult
        result2: Second ComputeResult
        label1: Label for first result
        label2: Label for second result

    Returns:
        Formatted comparison string
    """
    lines = []
    lines.append("=" * 60)
    lines.append("HMS COMPUTE COMPARISON")
    lines.append("=" * 60)

    # Status comparison
    lines.append(f"{label1}: {'SUCCESS' if result1.success else 'FAILED'}")
    lines.append(f"{label2}: {'SUCCESS' if result2.success else 'FAILED'}")

    # Error comparison
    lines.append("")
    lines.append(f"Errors: {label1}={len(result1.errors)}, {label2}={len(result2.errors)}")
    lines.append(f"Warnings: {label1}={len(result1.warnings)}, {label2}={len(result2.warnings)}")

    # New errors in result2
    error_codes_1 = {e.code for e in result1.errors}
    error_codes_2 = {e.code for e in result2.errors}

    new_errors = error_codes_2 - error_codes_1
    if new_errors:
        lines.append("")
        lines.append(f"NEW ERRORS in {label2}:")
        for err in result2.errors:
            if err.code in new_errors:
                lines.append(f"  [{err.code}] {err.message}")

    resolved_errors = error_codes_1 - error_codes_2
    if resolved_errors:
        lines.append("")
        lines.append(f"RESOLVED ERRORS in {label2}:")
        for err in result1.errors:
            if err.code in resolved_errors:
                lines.append(f"  [{err.code}] {err.message}")

    # New warnings
    warn_codes_1 = {w.code for w in result1.warnings}
    warn_codes_2 = {w.code for w in result2.warnings}

    new_warnings = warn_codes_2 - warn_codes_1
    if new_warnings:
        lines.append("")
        lines.append(f"NEW WARNINGS in {label2}:")
        for warn in result2.warnings:
            if warn.code in new_warnings:
                lines.append(f"  [{warn.code}] {warn.message}")

    lines.append("=" * 60)

    return "\n".join(lines)

get_message_description(code) staticmethod

Get human-readable description for an HMS message code.

Parameters:

Name Type Description Default
code int

HMS message code

required

Returns:

Type Description
str

Description string or "Unknown message code"

Source code in hms_commander/HmsOutput.py
@staticmethod
def get_message_description(code: int) -> str:
    """
    Get human-readable description for an HMS message code.

    Args:
        code: HMS message code

    Returns:
        Description string or "Unknown message code"
    """
    return HmsOutput.MESSAGE_CODES.get(code, "Unknown message code")

is_version_upgrade_error(result) staticmethod

Check if errors are likely due to version upgrade issues.

Parameters:

Name Type Description Default
result ComputeResult

ComputeResult from parse_compute_output

required

Returns:

Type Description
Tuple[bool, List[str]]

Tuple of (is_upgrade_error, list of potential issues)

Example

is_upgrade, issues = HmsOutput.is_version_upgrade_error(result) if is_upgrade: ... for issue in issues: ... print(f"Upgrade issue: {issue}")

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def is_version_upgrade_error(
    result: ComputeResult
) -> Tuple[bool, List[str]]:
    """
    Check if errors are likely due to version upgrade issues.

    Args:
        result: ComputeResult from parse_compute_output

    Returns:
        Tuple of (is_upgrade_error, list of potential issues)

    Example:
        >>> is_upgrade, issues = HmsOutput.is_version_upgrade_error(result)
        >>> if is_upgrade:
        ...     for issue in issues:
        ...         print(f"Upgrade issue: {issue}")
    """
    issues = []

    # Common upgrade-related error patterns
    upgrade_patterns = [
        (r'DSS file.*version', "DSS file version incompatibility"),
        (r'Unknown.*method', "Unknown method - may need parameter update"),
        (r'deprecated', "Deprecated feature"),
        (r'not.*supported', "Feature not supported in this version"),
        (r'cannot.*open.*project', "Project file format incompatibility"),
        (r'unable.*load', "Unable to load component - format change"),
    ]

    combined_text = result.stdout.lower() + result.stderr.lower()
    for error in result.errors:
        combined_text += error.message.lower()

    for pattern, description in upgrade_patterns:
        if re.search(pattern, combined_text, re.IGNORECASE):
            issues.append(description)

    is_upgrade = len(issues) > 0

    return is_upgrade, issues

parse_project_log(project_path, project_name=None) staticmethod

Parse the project log file from a project directory.

Parameters:

Name Type Description Default
project_path Union[str, Path]

Path to the project directory

required
project_name Optional[str]

Project name (auto-detected if not provided)

None

Returns:

Type Description
ComputeResult

ComputeResult from the project log file

Example

result = HmsOutput.parse_project_log("C:/Projects/MyProject") print(HmsOutput.format_summary(result))

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def parse_project_log(
    project_path: Union[str, Path],
    project_name: Optional[str] = None
) -> ComputeResult:
    """
    Parse the project log file from a project directory.

    Args:
        project_path: Path to the project directory
        project_name: Project name (auto-detected if not provided)

    Returns:
        ComputeResult from the project log file

    Example:
        >>> result = HmsOutput.parse_project_log("C:/Projects/MyProject")
        >>> print(HmsOutput.format_summary(result))
    """
    project_path = Path(project_path)

    # Auto-detect project name from .hms file if not provided
    if project_name is None:
        hms_files = list(project_path.glob("*.hms"))
        if hms_files:
            project_name = hms_files[0].stem
        else:
            raise FileNotFoundError(f"No .hms file found in {project_path}")

    log_file = project_path / f"{project_name}.log"
    if not log_file.exists():
        raise FileNotFoundError(f"Project log file not found: {log_file}")

    return HmsOutput.parse_log_file(log_file)

parse_run_log(project_path, run_name) staticmethod

Parse a specific run log file.

Parameters:

Name Type Description Default
project_path Union[str, Path]

Path to the project directory

required
run_name str

Name of the run

required

Returns:

Type Description
ComputeResult

ComputeResult from the run log file

Example

result = HmsOutput.parse_run_log("C:/Projects/MyProject", "Run 1") if result.success: ... print("Run completed successfully")

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def parse_run_log(
    project_path: Union[str, Path],
    run_name: str
) -> ComputeResult:
    """
    Parse a specific run log file.

    Args:
        project_path: Path to the project directory
        run_name: Name of the run

    Returns:
        ComputeResult from the run log file

    Example:
        >>> result = HmsOutput.parse_run_log("C:/Projects/MyProject", "Run 1")
        >>> if result.success:
        ...     print("Run completed successfully")
    """
    project_path = Path(project_path)

    # Try common log file naming patterns
    log_patterns = [
        f"{run_name.replace(' ', '_')}.log",
        f"{run_name}.log",
        f"Run_{run_name.split()[-1]}.log" if run_name.startswith("Run ") else None,
    ]

    for pattern in log_patterns:
        if pattern:
            log_file = project_path / pattern
            if log_file.exists():
                return HmsOutput.parse_log_file(log_file)

    raise FileNotFoundError(f"Run log file not found for '{run_name}' in {project_path}")

check_version_upgrade(log_content) staticmethod

Check if a version upgrade occurred during project open.

Parameters:

Name Type Description Default
log_content str

Content of the project log file

required

Returns:

Type Description
Tuple[bool, Optional[str], Optional[str]]

Tuple of (upgrade_occurred, from_version, to_version)

Example

upgraded, from_ver, to_ver = HmsOutput.check_version_upgrade(log) if upgraded: ... print(f"Project upgraded from {from_ver} to {to_ver}")

Source code in hms_commander/HmsOutput.py
@staticmethod
@log_call
def check_version_upgrade(
    log_content: str
) -> Tuple[bool, Optional[str], Optional[str]]:
    """
    Check if a version upgrade occurred during project open.

    Args:
        log_content: Content of the project log file

    Returns:
        Tuple of (upgrade_occurred, from_version, to_version)

    Example:
        >>> upgraded, from_ver, to_ver = HmsOutput.check_version_upgrade(log)
        >>> if upgraded:
        ...     print(f"Project upgraded from {from_ver} to {to_ver}")
    """
    match = HmsOutput.VERSION_UPGRADE_PATTERN.search(log_content)
    if match:
        return True, match.group(2), match.group(3)
    return False, None, None

get_project_name_from_hms(hms_file_path) staticmethod

Extract the project name from an .hms file.

The project name is on the first line: "Project: "

Parameters:

Name Type Description Default
hms_file_path Union[str, Path]

Path to the .hms file

required

Returns:

Type Description
str

Project name string

Example

name = HmsOutput.get_project_name_from_hms("C:/Projects/tifton/tifton.hms") print(name) # "tifton"

Source code in hms_commander/HmsOutput.py
@staticmethod
def get_project_name_from_hms(
    hms_file_path: Union[str, Path]
) -> str:
    """
    Extract the project name from an .hms file.

    The project name is on the first line: "Project: <name>"

    Args:
        hms_file_path: Path to the .hms file

    Returns:
        Project name string

    Example:
        >>> name = HmsOutput.get_project_name_from_hms("C:/Projects/tifton/tifton.hms")
        >>> print(name)  # "tifton"
    """
    hms_file_path = Path(hms_file_path)

    if not hms_file_path.exists():
        raise FileNotFoundError(f".hms file not found: {hms_file_path}")

    content = hms_file_path.read_text(encoding='utf-8', errors='replace')

    # Look for "Project: <name>" pattern
    match = re.search(r'^Project:\s*(.+)$', content, re.MULTILINE)
    if match:
        return match.group(1).strip()

    raise ValueError(f"Could not find project name in {hms_file_path}")

ComputeResult

hms_commander.ComputeResult dataclass

Results from an HMS compute operation.

Source code in hms_commander/HmsOutput.py
@dataclass
class ComputeResult:
    """Results from an HMS compute operation."""
    success: bool
    run_name: Optional[str]
    project_name: Optional[str]
    hms_version: Optional[str]
    start_time: Optional[datetime]
    end_time: Optional[datetime]
    exit_code: int
    notes: List[HmsMessage]
    warnings: List[HmsMessage]
    errors: List[HmsMessage]
    stdout: str
    stderr: str

HmsMessage

hms_commander.HmsMessage dataclass

Represents a single HMS log/output message.

Source code in hms_commander/HmsOutput.py
@dataclass
class HmsMessage:
    """Represents a single HMS log/output message."""
    type: str  # NOTE, WARNING, ERROR
    code: int  # Message code (e.g., 10008, 42720)
    message: str  # Message text
    timestamp: Optional[datetime] = None
    raw_line: Optional[str] = None
CLB Engineering Corporation  ·  LLM Forward Engineering
HMS Commander is a free and open-source project maintained by CLB Engineering Corporation. For agencies and firms seeking to modernize H&H workflows with LLM Forward approaches, contact CLB to partner with the engineers who wrote the automation.