Skip to content

Reference

Define adapter / helper classes to hide unrelated functionality in.

SourceCodeFixer

Adapter that holds all source code yaml fixers.

Source code in yamlfix/adapters.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
class SourceCodeFixer:
    """Adapter that holds all source code yaml fixers."""

    def __init__(self, yaml: Yaml, config: Optional[YamlfixConfig]) -> None:
        """Initialize the source code fixer adapter with a configured yaml fixer \
            instance and optional yamlfix config.

        Args:
            yaml: Initialized Ruamel formatter to use for source code correction.
            config: Small set of user provided configuration options for yamlfix.
        """
        self.yaml = yaml.yaml
        self.config = config or YamlfixConfig()

    def fix(self, source_code: str) -> str:
        """Run all yaml source code fixers.

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        log.debug("Running source code fixers...")

        fixers = [
            self._fix_truthy_strings,
            self._fix_jinja_variables,
            self._ruamel_yaml_fixer,
            self._restore_truthy_strings,
            self._restore_jinja_variables,
            self._restore_double_exclamations,
            self._fix_comments,
            self._fix_flow_style_lists,
            self._fix_whitelines,
            self._fix_top_level_lists,
            self._add_newline_at_end_of_file,
        ]

        for fixer in fixers:
            source_code = fixer(source_code)

        return source_code

    def _ruamel_yaml_fixer(self, source_code: str) -> str:
        """Run Ruamel's yaml fixer.

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        log.debug("Running ruamel yaml fixer...")
        source_dicts = self.yaml.load_all(source_code)

        # Return the output to a string
        string_stream = StringIO()
        for source_dict in source_dicts:
            self.yaml.dump(source_dict, string_stream)
            source_code = string_stream.getvalue()
        string_stream.close()

        return source_code.strip()

    @staticmethod
    def _fix_top_level_lists(source_code: str) -> str:
        """Deindent the source with a top level list.

        Documents like the following:

        ```yaml
        ---
        # Comment
        - item 1
        - item 2
        ```

        Are wrongly indented by the ruyaml parser:

        ```yaml
        ---
        # Comment
        - item 1
        - item 2
        ```

        This function restores the indentation back to the original.

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        log.debug("Fixing top level lists...")
        source_lines = source_code.splitlines()
        fixed_source_lines: List[str] = []
        is_top_level_list: Optional[bool] = None

        indent: str = ""
        for line in source_lines:
            # Skip the heading and first empty lines
            if re.match(r"^(---|#.*|)$", line):
                fixed_source_lines.append(line)
                continue

            # Check if the first valid line is an indented list item
            if re.match(r"\s*- +.*", line) and is_top_level_list is None:
                is_top_level_list = True

                # Extract the indentation level
                serialized_line = re.match(r"(?P<indent>\s*)- +(?P<content>.*)", line)
                if serialized_line is None:  # pragma: no cover
                    raise ValueError(
                        f"Error extracting the indentation of line: {line}"
                    )
                indent = serialized_line.groupdict()["indent"]

                # Remove the indentation from the line
                fixed_source_lines.append(re.sub(rf"^{indent}(.*)", r"\1", line))
            elif is_top_level_list:
                # ruyaml doesn't change the indentation of comments
                if re.match(r"\s*#.*", line):
                    fixed_source_lines.append(line)
                else:
                    fixed_source_lines.append(re.sub(rf"^{indent}(.*)", r"\1", line))
            else:
                return source_code

        return "\n".join(fixed_source_lines)

    @staticmethod
    def _fix_flow_style_lists(source_code: str) -> str:
        """Fix trailing newlines within flow-style lists.

        Documents like the following:

        ```yaml
        ---
        list: ["a", b, 'c']


        next-element: "d"
        ```

        Are wrongly formatted by the ruyaml parser:

        ```yaml
        ---
        list: ["a", b, 'c'


        ]
        next-element: "d"
        ```

        This function moves the closing bracket to the end of the flow-style
        list definition and positions the newlines after the closing bracket.

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        log.debug("Fixing flow-style lists...")
        pattern = r"\[(?P<items>.*)(?P<newlines>\n+)]"
        replacement = r"[\g<items>]\g<newlines>"
        return re.sub(pattern, repl=replacement, string=source_code)

    @staticmethod
    def _fix_truthy_strings(source_code: str) -> str:
        """Convert common strings that refer to booleans.

        All caps variations of true, yes and on are transformed to true, while false,
        no and off are transformed to false.

        Ruyaml understands these strings and converts them to the lower version of
        the word instead of converting them to true and false.

        [More info](https://yamllint.readthedocs.io/en/stable/rules.html#module-yamllint.rules.truthy) # noqa: E501

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        log.debug("Fixing truthy strings...")
        source_lines = source_code.splitlines()
        fixed_source_lines: List[str] = []

        for line in source_lines:
            line_contains_true = re.match(
                r"(?P<pre_boolean_text>.*(:|-) )(true|yes|on)$", line, re.IGNORECASE
            )
            line_contains_false = re.match(
                r"(?P<pre_boolean_text>.*(:|-) )(false|no|off)$", line, re.IGNORECASE
            )

            if line_contains_true:
                fixed_source_lines.append(
                    f"{line_contains_true.groupdict()['pre_boolean_text']}true"
                )
            elif line_contains_false:
                fixed_source_lines.append(
                    f"{line_contains_false.groupdict()['pre_boolean_text']}false"
                )
            else:
                fixed_source_lines.append(line)

        return "\n".join(fixed_source_lines)

    @staticmethod
    def _restore_truthy_strings(source_code: str) -> str:
        """Restore truthy strings to strings.

        The Ruyaml parser removes the apostrophes of all the caps variations of
        the strings 'yes', 'on', no and 'off' as it interprets them as booleans.

        As this function is run after _fix_truthy_strings, those strings are
        meant to be strings. So we're turning them back from booleans to strings.

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        log.debug("Restoring truthy strings...")
        source_lines = source_code.splitlines()
        fixed_source_lines: List[str] = []

        for line in source_lines:
            line_contains_valid_truthy_string = re.match(
                r"(?P<pre_boolean_text>.*(:|-) )(?P<boolean_text>yes|on|no|off)$",
                line,
                re.IGNORECASE,
            )
            if line_contains_valid_truthy_string:
                fixed_source_lines.append(
                    f"{line_contains_valid_truthy_string.groupdict()['pre_boolean_text']}"  # noqa: E501
                    f"'{line_contains_valid_truthy_string.groupdict()['boolean_text']}'"
                )
            else:
                fixed_source_lines.append(line)

        return "\n".join(fixed_source_lines)

    def _fix_comments(self, source_code: str) -> str:
        log.debug("Fixing comments...")
        config = self.config
        comment_start = " " * config.comments_min_spaces_from_content + "#"

        fixed_source_lines = []

        for line in source_code.splitlines():
            # Comment at the start of the line
            if config.comments_require_starting_space and re.search(r"(^|\s)#\w", line):
                line = line.replace("#", "# ")
            # Comment in the middle of the line, but it's not part of a string
            if (
                config.comments_min_spaces_from_content > 1
                and " #" in line
                and line[-1] not in ["'", '"']
            ):
                line = re.sub(r"(.+\S)(\s+?)#", rf"\1{comment_start}", line)
            fixed_source_lines.append(line)

        return "\n".join(fixed_source_lines)

    def _fix_whitelines(self, source_code: str) -> str:
        """Fixes number of consecutive whitelines.

        Before a line that only includes a comment, either:
          - 0 whiteline is allowed
          - Exactly `self.config.comments_whitelines` whitelines are allowed

        This method also adjusts amount of whitelines that are not immediately followed
        by a comment.

        Args:
            self: Source code to be corrected.

        Returns:
            Source code with appropriate whitelines standards.
        """
        config = self.config
        n_whitelines = config.whitelines
        n_whitelines_from_content = config.comments_whitelines

        re_whitelines_with_comments = "\n\n+[\t ]{0,}[#]"
        re_whitelines_with_no_comments = "\n\n+[\t ]{0,}[^#\n\t ]"

        adjust_whitelines = partial(self._replace_whitelines, n_whitelines=n_whitelines)
        replace_by_n_whitelines = partial(
            self._replace_whitelines,
            n_whitelines=n_whitelines_from_content,
        )

        source_code = re.sub(
            pattern=re_whitelines_with_no_comments,
            repl=adjust_whitelines,
            string=source_code,
        )
        source_code = self._fix_section_whitelines(source_code)
        source_code = re.sub(
            pattern=re_whitelines_with_comments,
            repl=replace_by_n_whitelines,
            string=source_code,
        )

        return source_code

    @staticmethod
    def _replace_whitelines(match: Match[str], n_whitelines: int) -> str:
        """Replaces whitelines by a fixed number, `n_whitelines`, of whitelines.

        Method used by `SourceCodeFixer._fix_whitelines()` to replace whitelines when
        whitelines are not followed by a comment.

        Args:
            match: The matched expression by the regex module, `re`
            n_whitelines: Desired number of whitelines to use to replace all leading
            whitelines in `match`

        Returns:
            A string corresponding to the matched string with its leading whitelines
            replaced by `n_whitelines` whitelines.
        """
        matched_str = match.group()
        adjusted_matched_str = "\n" * (n_whitelines + 1) + matched_str.lstrip("\n")

        return adjusted_matched_str

    def _fix_section_whitelines(self, source_code: str) -> str:
        re_section = "\n*(^#.*\n)*\n*^[^ ].*:\n(\n|(^  .*))+\n*"

        # Match the first --- or start of the string \A
        # See: https://docs.python.org/3.9/library/re.html#regular-expression-syntax
        re_beginning_section = f"(?P<b>(?:---\n|\\A){re_section})"
        re_normal_section = f"(?P<s>{re_section})"
        re_full = f"{re_beginning_section}|{re_normal_section}"
        pattern = re.compile(re_full, flags=re.MULTILINE)
        n_whitelines = self.config.whitelines
        n_section_whitelines = self.config.section_whitelines

        def _fix_before_section(match: Match[str]) -> str:
            whitelines = n_section_whitelines
            section = match.group("s")
            if not section:
                return match.group()
            if n_whitelines > n_section_whitelines and section.startswith(
                "\n" + n_whitelines * "\n"
            ):
                whitelines = n_whitelines
            while section[0] == "\n":
                section = section[1:]
            return "\n" * (whitelines + 1) + section

        def _fix_after_section(match: Match[str]) -> str:
            whitelines = n_section_whitelines
            section = match.group("b") or match.group("s")
            if n_whitelines > n_section_whitelines and section.endswith(
                "\n\n" + n_whitelines * "\n"
            ):
                whitelines = n_whitelines
            while section[-1] == "\n":
                section = section[:-1]
            return section + "\n" * (whitelines + 1)

        before_fixed = pattern.sub(repl=_fix_before_section, string=source_code)
        after_fixed = pattern.sub(repl=_fix_after_section, string=before_fixed)
        while after_fixed[-2:] == "\n\n":
            after_fixed = after_fixed[:-1]
        return after_fixed

    @staticmethod
    def _restore_double_exclamations(source_code: str) -> str:
        """Restore the double exclamation marks.

        The Ruyaml parser transforms the !!python statement to !%21python which breaks
        some programs.
        """
        log.debug("Restoring double exclamations...")
        fixed_source_lines = []
        double_exclamation = re.compile(r"!%21")

        for line in source_code.splitlines():
            if double_exclamation.search(line):
                line = line.replace(r"!%21", "!!")
            fixed_source_lines.append(line)

        return "\n".join(fixed_source_lines)

    @staticmethod
    def _add_newline_at_end_of_file(source_code: str) -> str:
        """Ensures that the file ends with exactly one newline.

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        return source_code.rstrip() + "\n"

    @staticmethod
    def _fix_jinja_variables(source_code: str) -> str:
        """Remove spaces between jinja variables.

        So that they are not split in many lines by ruyaml

        Args:
            source_code: Source code to be corrected.

        Returns:
            Corrected source code.
        """
        log.debug("Fixing jinja2 variables...")
        source_lines = source_code.splitlines()
        fixed_source_lines: List[str] = []

        for line in source_lines:
            line_contains_jinja2_variable = re.search(r"{{.*}}", line)

            if line_contains_jinja2_variable:
                line = SourceCodeFixer._encode_jinja2_line(line)

            fixed_source_lines.append(line)

        return "\n".join(fixed_source_lines)

    @staticmethod
    def _encode_jinja2_line(line: str) -> str:
        """Encode jinja variables so that they are not split.

        Using a special character to join the elements inside the {{ }}, so that
        they are all taken as the same word, and ruyamel doesn't split them.
        """
        new_line = []
        variable_terms: List[str] = []

        for word in line.split(" "):
            if re.search("}}", word):
                variable_terms.append(word)
                new_line.append("★".join(variable_terms))
                variable_terms = []
            elif re.search("{{", word) or len(variable_terms) > 0:
                variable_terms.append(word)
            else:
                new_line.append(word)

        return " ".join(new_line)

    @staticmethod
    def _restore_jinja_variables(source_code: str) -> str:
        """Restore the jinja2 variables to their original state.

        Remove the encoding introduced by _fix_jinja_variables to prevent ruyaml
        to split the variables.
        """
        log.debug("Restoring jinja2 variables...")
        fixed_source_lines = []

        for line in source_code.splitlines():
            line_contains_jinja2_variable = re.search(r"{{.*}}", line)

            if line_contains_jinja2_variable:
                line = line.replace("★", " ")

            fixed_source_lines.append(line)

        return "\n".join(fixed_source_lines)

__init__(yaml, config)

Initialize the source code fixer adapter with a configured yaml fixer instance and optional yamlfix config.

Parameters:

Name Type Description Default
yaml Yaml

Initialized Ruamel formatter to use for source code correction.

required
config Optional[YamlfixConfig]

Small set of user provided configuration options for yamlfix.

required
Source code in yamlfix/adapters.py
326
327
328
329
330
331
332
333
334
335
def __init__(self, yaml: Yaml, config: Optional[YamlfixConfig]) -> None:
    """Initialize the source code fixer adapter with a configured yaml fixer \
        instance and optional yamlfix config.

    Args:
        yaml: Initialized Ruamel formatter to use for source code correction.
        config: Small set of user provided configuration options for yamlfix.
    """
    self.yaml = yaml.yaml
    self.config = config or YamlfixConfig()

fix(source_code)

Run all yaml source code fixers.

Parameters:

Name Type Description Default
source_code str

Source code to be corrected.

required

Returns:

Type Description
str

Corrected source code.

Source code in yamlfix/adapters.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def fix(self, source_code: str) -> str:
    """Run all yaml source code fixers.

    Args:
        source_code: Source code to be corrected.

    Returns:
        Corrected source code.
    """
    log.debug("Running source code fixers...")

    fixers = [
        self._fix_truthy_strings,
        self._fix_jinja_variables,
        self._ruamel_yaml_fixer,
        self._restore_truthy_strings,
        self._restore_jinja_variables,
        self._restore_double_exclamations,
        self._fix_comments,
        self._fix_flow_style_lists,
        self._fix_whitelines,
        self._fix_top_level_lists,
        self._add_newline_at_end_of_file,
    ]

    for fixer in fixers:
        source_code = fixer(source_code)

    return source_code

Yaml

Adapter that holds the configured ruaml yaml fixer.

Source code in yamlfix/adapters.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class Yaml:
    """Adapter that holds the configured ruaml yaml fixer."""

    def __init__(self, config: Optional[YamlfixConfig]) -> None:
        """Initialize the yaml adapter with an optional yamlfix config.

        Args:
            config: Small set of user provided configuration options for yamlfix.
        """
        self.yaml = YAML()
        self.config = config or YamlfixConfig()

        # we have to call setattr with the string value, because the internal ruyaml
        # implementation does the same thing and does not expose the attribute itself
        setattr(  # noqa: B010
            self.yaml,
            "_representer",
            YamlfixRepresenter(
                self.config,
                self.yaml.default_style,
                self.yaml.default_flow_style,
                self.yaml,
            ),
        )

        self._base_configuration()

    def _base_configuration(self) -> None:
        """Configure base settings for Ruamel's yaml."""
        log.debug("Running ruamel yaml base configuration...")
        config = self.config

        # Configure YAML formatter
        self.yaml.indent(
            mapping=config.indent_mapping,
            sequence=config.indent_sequence,
            offset=config.indent_offset,
        )
        self.yaml.allow_duplicate_keys = config.allow_duplicate_keys

        # Start the document with ---
        # ignore: variable has type None, what can we do, it doesn't have type hints...
        self.yaml.explicit_start = config.explicit_start  # type: ignore
        self.yaml.width = config.line_length  # type: ignore
        self.yaml.preserve_quotes = config.preserve_quotes  # type: ignore

__init__(config)

Initialize the yaml adapter with an optional yamlfix config.

Parameters:

Name Type Description Default
config Optional[YamlfixConfig]

Small set of user provided configuration options for yamlfix.

required
Source code in yamlfix/adapters.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self, config: Optional[YamlfixConfig]) -> None:
    """Initialize the yaml adapter with an optional yamlfix config.

    Args:
        config: Small set of user provided configuration options for yamlfix.
    """
    self.yaml = YAML()
    self.config = config or YamlfixConfig()

    # we have to call setattr with the string value, because the internal ruyaml
    # implementation does the same thing and does not expose the attribute itself
    setattr(  # noqa: B010
        self.yaml,
        "_representer",
        YamlfixRepresenter(
            self.config,
            self.yaml.default_style,
            self.yaml.default_flow_style,
            self.yaml,
        ),
    )

    self._base_configuration()

YamlfixRepresenter

Bases: RoundTripRepresenter

Yamlfix's custom implementation of the ruyaml.RoundTripRepresenter that can be configured with YamlfixConfig.

Source code in yamlfix/adapters.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class YamlfixRepresenter(RoundTripRepresenter):
    """Yamlfix's custom implementation of the ruyaml.RoundTripRepresenter\
        that can be configured with YamlfixConfig."""

    def __init__(
        self,
        config: YamlfixConfig,
        default_style: Optional[str] = None,
        default_flow_style: Optional[bool] = None,
        dumper: Optional[YAML] = None,
    ) -> None:
        """Initialize the YamlfixRepresenter and its parent RoundTripRepresenter."""
        RoundTripRepresenter.__init__(
            self,
            default_style=default_style,
            default_flow_style=default_flow_style,
            dumper=dumper,
        )

        self.config: YamlfixConfig = config
        self.patch_functions: List[Callable[[Node, Node], None]] = []

        configure_patch_functions = [
            self._configure_quotation_for_basic_values,
            self._configure_sequence_style,
        ]

        for patch_configurer in configure_patch_functions:
            patch_configurer()

    def represent_none(self, data: Any) -> ScalarNode:  # noqa: ANN401
        """Configure how Ruamel's yaml represents None values.

        Default is an empty representation, could be overridden by canonical values
        like "~", "null", "NULL"
        """
        if (
            self.config.none_representation is None
            or self.config.none_representation == ""
        ):
            return super().represent_none(data)

        return self.represent_scalar(
            "tag:yaml.org,2002:null", self.config.none_representation
        )

    def represent_str(self, data: Any) -> ScalarNode:  # noqa: ANN401
        """Configure Ruamel's yaml fixer to quote all yaml keys and simple* string values.

        Simple string values meaning: No multi line strings, as they are represented
        by LiteralScalarStrings instead.
        """
        if (
            not self.config.quote_keys_and_basic_values
            or self.config.quote_representation is None
        ):
            return super().represent_str(data)

        return self.represent_scalar(
            "tag:yaml.org,2002:str", data, self.config.quote_representation
        )

    def represent_mapping(
        self, tag: Any, mapping: Any, flow_style: Optional[Any] = None  # noqa: ANN401
    ) -> MappingNode:
        """Modify / Patch the original ruyaml representer represent_mapping value and\
            call the provided patch_function on its mapping_values."""
        mapping_node: MappingNode = super().represent_mapping(tag, mapping, flow_style)
        mapping_values: List[Tuple[ScalarNode, Node]] = mapping_node.value

        if isinstance(mapping_values, list):
            for mapping_value in mapping_values:
                if isinstance(mapping_value, tuple):
                    key_node: Node = mapping_value[0]
                    value_node: Node = mapping_value[1]
                    for patch_function in self.patch_functions:
                        patch_function(key_node, value_node)

        return mapping_node

    def _configure_quotation_for_basic_values(self) -> None:
        """Configure Ruamel's yaml fixer to quote only simple* yaml string values.

        Simple string values meaning: Any string that does not already have an
        explicit 'style' applied already -> multi line strings have a style value
        of "|" per default.
        """
        config = self.config
        log.debug("Setting up ruamel yaml 'quote simple values' configuration...")

        def patch_quotations(key_node: Node, value_node: Node) -> None:  # noqa: W0613
            if not config.quote_basic_values or config.quote_representation is None:
                return

            # if this is a scalar value node itself, apply the quotations now
            self._apply_simple_value_quotations(value_node)

            # if this is a sequence value node, check for value presence, complex
            # sequences and apply quotations to its values
            if not isinstance(value_node, SequenceNode) or value_node.value is None:
                return

            sequence_node: SequenceNode = value_node

            if self._seq_contains_non_scalar_nodes(
                sequence_node
            ) or self._seq_contains_non_empty_comments(sequence_node):
                return

            for seq_value in sequence_node.value:
                self._apply_simple_value_quotations(seq_value)

        self.patch_functions.append(patch_quotations)

    def _configure_sequence_style(self) -> None:
        """Configure Ruamel's yaml fixer to represent lists as either block-style \
            or flow-style.

        Also make sure, that lists containing non-scalar values (other maps, \
            lists), lists that contain comments and lists that would breach the
            line-length are forced to block-style, regardless of configuration.

        Lists in block-style look like this:
        ```
        list:
          # Comment for item
          - item
          - item
          - complex_item:
              # Comment for key
              key: value
        ```

        Lists in flow-style look like this, we do not convert lists with complex
        values or lists with comments to that style, it is meant for simple lists,
        that contain only scalar values (string, int, bool, etc.) not other complex
        values (lists, dicts, comments, etc.)
        ```
        list: [item, item, item]
        ```

        Empty lists are not handled well in either style, so they are skipped as well,
        as you can only represent empty lists in flow-style either way.
        """
        config = self.config
        log.debug("Setting up ruamel yaml 'sequence flow style' configuration...")

        def patch_sequence_style(key_node: Node, value_node: Node) -> None:
            if isinstance(key_node, ScalarNode) and isinstance(
                value_node, SequenceNode
            ):
                # don't modify the sequence style at all, if the config value is
                # set to `keep_style`
                if config.sequence_style == YamlNodeStyle.KEEP_STYLE:
                    return

                force_block_style: bool = False
                sequence_node: SequenceNode = value_node

                # check if the sequence node value is present and if it is not empty
                if not sequence_node.value:
                    return

                # if this sequence contains non-scalar nodes (i.e. dicts, lists, etc.),
                # force block-style
                force_block_style = (
                    force_block_style
                    or self._seq_contains_non_scalar_nodes(sequence_node)
                )

                # if this sequence contains non-empty comments, force block-style
                force_block_style = (
                    force_block_style
                    or self._seq_contains_non_empty_comments(sequence_node)
                )

                # if this sequence, rendered in flow-style would breach the line-width,
                # force block-style roughly calculate the consumed width, in any case
                # ruyaml will fold flow-style lists if they breach the limit only
                # consider scalars, as non-scalar nodes should force block-style already
                force_block_style = (
                    force_block_style
                    or self._seq_length_longer_than_line_length(key_node, sequence_node)
                )

                sequence_node.flow_style = (
                    config.sequence_style == YamlNodeStyle.FLOW_STYLE
                )
                if force_block_style:
                    sequence_node.flow_style = False

        self.patch_functions.append(patch_sequence_style)

    @staticmethod
    def _seq_contains_non_scalar_nodes(seq_node: Node) -> bool:
        return any(not isinstance(node, ScalarNode) for node in seq_node.value)

    @staticmethod
    def _seq_contains_non_empty_comments(seq_node: Node) -> bool:
        comment_tokens: List[CommentToken] = []

        for node in seq_node.value:
            if isinstance(node, ScalarNode) and isinstance(node.comment, list):
                comment_tokens.extend(node.comment)

        return any(
            isinstance(comment_token, CommentToken)
            and comment_token.value.strip() != ""
            for comment_token in comment_tokens
        )

    def _seq_length_longer_than_line_length(
        self, key_node: Node, seq_node: Node
    ) -> bool:
        config = self.config

        # This could be made configurable, or rather we could calculate if we need
        # the quotation spaces for the configured settings, but if we err on the
        # side of caution we can always force block-mode even for values that could
        # technically, without quotes, fit into the line-length

        # quotation marks around scalar value
        quote_length: int = 2

        # comma and space between scalar values or colon and space
        # between key + values
        separator_length: int = 2

        # opening and closing brackets that should fit on the same line
        bracket_length: int = 2

        key_length: int = len(str(key_node.value)) + quote_length + separator_length

        scalar_length: int = 0

        for node in seq_node.value:
            if isinstance(node, ScalarNode):
                scalar_length += len(str(node.value)) + quote_length + separator_length

        if key_length + scalar_length + bracket_length > config.line_length:
            return True

        return False

    def _apply_simple_value_quotations(self, value_node: Node) -> None:
        if (
            isinstance(value_node, ScalarNode)
            and value_node.tag == "tag:yaml.org,2002:str"
            and value_node.style is None
        ):
            value_node.style = self.config.quote_representation

__init__(config, default_style=None, default_flow_style=None, dumper=None)

Initialize the YamlfixRepresenter and its parent RoundTripRepresenter.

Source code in yamlfix/adapters.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(
    self,
    config: YamlfixConfig,
    default_style: Optional[str] = None,
    default_flow_style: Optional[bool] = None,
    dumper: Optional[YAML] = None,
) -> None:
    """Initialize the YamlfixRepresenter and its parent RoundTripRepresenter."""
    RoundTripRepresenter.__init__(
        self,
        default_style=default_style,
        default_flow_style=default_flow_style,
        dumper=dumper,
    )

    self.config: YamlfixConfig = config
    self.patch_functions: List[Callable[[Node, Node], None]] = []

    configure_patch_functions = [
        self._configure_quotation_for_basic_values,
        self._configure_sequence_style,
    ]

    for patch_configurer in configure_patch_functions:
        patch_configurer()

represent_mapping(tag, mapping, flow_style=None)

Modify / Patch the original ruyaml representer represent_mapping value and call the provided patch_function on its mapping_values.

Source code in yamlfix/adapters.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def represent_mapping(
    self, tag: Any, mapping: Any, flow_style: Optional[Any] = None  # noqa: ANN401
) -> MappingNode:
    """Modify / Patch the original ruyaml representer represent_mapping value and\
        call the provided patch_function on its mapping_values."""
    mapping_node: MappingNode = super().represent_mapping(tag, mapping, flow_style)
    mapping_values: List[Tuple[ScalarNode, Node]] = mapping_node.value

    if isinstance(mapping_values, list):
        for mapping_value in mapping_values:
            if isinstance(mapping_value, tuple):
                key_node: Node = mapping_value[0]
                value_node: Node = mapping_value[1]
                for patch_function in self.patch_functions:
                    patch_function(key_node, value_node)

    return mapping_node

represent_none(data)

Configure how Ruamel's yaml represents None values.

Default is an empty representation, could be overridden by canonical values like "~", "null", "NULL"

Source code in yamlfix/adapters.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def represent_none(self, data: Any) -> ScalarNode:  # noqa: ANN401
    """Configure how Ruamel's yaml represents None values.

    Default is an empty representation, could be overridden by canonical values
    like "~", "null", "NULL"
    """
    if (
        self.config.none_representation is None
        or self.config.none_representation == ""
    ):
        return super().represent_none(data)

    return self.represent_scalar(
        "tag:yaml.org,2002:null", self.config.none_representation
    )

represent_str(data)

Configure Ruamel's yaml fixer to quote all yaml keys and simple* string values.

Simple string values meaning: No multi line strings, as they are represented by LiteralScalarStrings instead.

Source code in yamlfix/adapters.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def represent_str(self, data: Any) -> ScalarNode:  # noqa: ANN401
    """Configure Ruamel's yaml fixer to quote all yaml keys and simple* string values.

    Simple string values meaning: No multi line strings, as they are represented
    by LiteralScalarStrings instead.
    """
    if (
        not self.config.quote_keys_and_basic_values
        or self.config.quote_representation is None
    ):
        return super().represent_str(data)

    return self.represent_scalar(
        "tag:yaml.org,2002:str", data, self.config.quote_representation
    )

Define the configuration of the main program.

configure_yamlfix(yamlfix_config, config_files=None, additional_config=None)

Configure the YamlfixConfig object from .toml/.ini configuration files and additional config overrides.

Source code in yamlfix/config.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def configure_yamlfix(
    yamlfix_config: YamlfixConfig,
    config_files: Optional[List[str]] = None,
    additional_config: Optional[Dict[str, str]] = None,
) -> None:
    """Configure the YamlfixConfig object from .toml/.ini configuration files \
        and additional config overrides."""
    config_path: Optional[Path] = None

    if additional_config:
        config_path_env: Optional[str] = additional_config.get("config_path")
        if config_path_env:
            config_path = Path(config_path_env)

    config: ProjectConfig = ProjectConfig(
        config_schema=YamlfixConfig,
        merge_configs=True,
        project_name="yamlfix",
        source_files=config_files,
        starting_path=config_path,
    )
    config_dict: Dict[str, Any] = config.to_dict()

    if additional_config:
        for override_key, override_val in additional_config.items():
            config_dict[override_key] = override_val

    config.validate()
    config_dict = config.to_dict()

    for config_key, config_val in config_dict.items():
        setattr(yamlfix_config, config_key, config_val)

Define the different ways to expose the program functionality.

Functions

ANSIFGColorCode

Bases: Enum

ANSI escape codes for colored output.

Source code in yamlfix/entrypoints/__init__.py
12
13
14
15
16
17
18
19
20
21
22
23
class ANSIFGColorCode(Enum):
    """ANSI escape codes for colored output."""

    BLACK = 30
    RED = 31
    GREEN = 32
    YELLOW = 33
    BLUE = 34
    MAGENTA = 35
    CYAN = 36
    WHITE = 37
    RESET = 0

ConsoleColorFormatter

Bases: logging.Formatter

Custom formatter that prints log levels to the console as colored plus signs.

Source code in yamlfix/entrypoints/__init__.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ConsoleColorFormatter(logging.Formatter):
    """Custom formatter that prints log levels to the console as colored plus signs."""

    # ANSI escape codes for colored output
    colors = {
        logging.DEBUG: ANSIFGColorCode.WHITE,
        # There are only 2 named levels under WARNING, we need 3 levels of verbosity
        # Using half-way between DEBUG and INFO as additional verbosity level
        # It is currently used for logging unchanged files
        15: ANSIFGColorCode.GREEN,
        logging.INFO: ANSIFGColorCode.CYAN,
        logging.WARNING: ANSIFGColorCode.YELLOW,
        logging.ERROR: ANSIFGColorCode.RED,
    }

    def format(self, record: logging.LogRecord) -> str:
        """Format log records as a colored plus sign followed by the log message."""
        color = self.colors.get(record.levelno, ANSIFGColorCode.RESET)
        self._style._fmt = f"[\033[{color.value}m+\033[0m] %(message)s"  # noqa: W0212
        return super().format(record)

format(record)

Format log records as a colored plus sign followed by the log message.

Source code in yamlfix/entrypoints/__init__.py
41
42
43
44
45
def format(self, record: logging.LogRecord) -> str:
    """Format log records as a colored plus sign followed by the log message."""
    color = self.colors.get(record.levelno, ANSIFGColorCode.RESET)
    self._style._fmt = f"[\033[{color.value}m+\033[0m] %(message)s"  # noqa: W0212
    return super().format(record)

load_logger(verbose=0)

Configure the Logging logger.

Parameters:

Name Type Description Default
verbose int

Allow more detailed logging output.

0
Source code in yamlfix/entrypoints/__init__.py
48
49
50
51
52
53
54
55
56
57
def load_logger(verbose: int = 0) -> None:
    """Configure the Logging logger.

    Args:
        verbose: Allow more detailed logging output.
    """
    log_level = logging.INFO - verbose * 5
    logging.basicConfig(stream=sys.stderr, level=log_level)
    for handler in logging.getLogger().handlers:
        handler.setFormatter(ConsoleColorFormatter())

Define program entities like configuration value entities.

YamlNodeStyle

Bases: Enum

Represent the desired YAML node style for sequences and mappings.

Source code in yamlfix/model.py
 8
 9
10
11
12
13
class YamlNodeStyle(Enum):
    """Represent the desired YAML node style for sequences and mappings."""

    FLOW_STYLE = "flow_style"
    BLOCK_STYLE = "block_style"
    KEEP_STYLE = "keep_style"

YamlfixConfig

Bases: ConfigSchema

Configuration entity for yamlfix.

Source code in yamlfix/model.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class YamlfixConfig(ConfigSchema):
    """Configuration entity for yamlfix."""

    allow_duplicate_keys: bool = False
    comments_min_spaces_from_content: int = 2
    comments_require_starting_space: bool = True
    comments_whitelines: int = 1
    whitelines: int = 0
    section_whitelines: int = 0
    config_path: Optional[str] = None
    explicit_start: bool = True
    indent_mapping: int = 2
    indent_offset: int = 2
    indent_sequence: int = 4
    line_length: int = 80
    none_representation: str = ""
    quote_basic_values: bool = False
    quote_keys_and_basic_values: bool = False
    preserve_quotes: bool = False
    quote_representation: str = "'"
    sequence_style: YamlNodeStyle = YamlNodeStyle.FLOW_STYLE

Define all the orchestration functionality required by the program to work.

Classes and functions that connect the different domain model objects with the adapters and handlers to achieve the program's purpose.

fix_code(source_code, config=None)

Fix yaml source code to correct the format.

It corrects these errors
  • Add --- at the beginning of the file.
  • Correct truthy strings: 'True' -> true, 'no' -> 'false'
  • Remove unnecessary apostrophes: title: 'Why we sleep' -> title: Why we sleep.

Parameters:

Name Type Description Default
source_code str

Source code to be corrected.

required
config Optional[YamlfixConfig]

Small set of user provided configuration options for yamlfix.

None

Returns:

Type Description
str

Corrected source code.

Source code in yamlfix/services.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def fix_code(source_code: str, config: Optional[YamlfixConfig] = None) -> str:
    """Fix yaml source code to correct the format.

    It corrects these errors:

        * Add --- at the beginning of the file.
        * Correct truthy strings: 'True' -> true, 'no' -> 'false'
        * Remove unnecessary apostrophes: `title: 'Why we sleep'` ->
            `title: Why we sleep`.

    Args:
        source_code: Source code to be corrected.
        config: Small set of user provided configuration options for yamlfix.

    Returns:
        Corrected source code.
    """
    # Leave Ansible vaults unmodified
    if source_code.startswith("$ANSIBLE_VAULT;"):
        return source_code

    if source_code.startswith("#!"):
        # Skip the shebang line if present, leaving it unmodified
        eolpos = source_code.find("\n") + 1
        shebang = source_code[:eolpos]
        source_code = source_code[eolpos:]
    else:
        shebang = ""

    if source_code.startswith("#jinja2:") or source_code.startswith("# jinja2:"):
        eolpos = source_code.find("\n") + 1
        jinja2 = source_code[:eolpos]
        source_code = source_code[eolpos:]
    else:
        jinja2 = ""

    yaml = Yaml(config=config)
    fixer = SourceCodeFixer(yaml=yaml, config=config)

    source_code = fixer.fix(source_code=source_code)

    return jinja2 + shebang + source_code

fix_files(files, dry_run=None, config=None)

Fix the yaml source code of a list of files.

If the input is taken from stdin, it will return the fixed value.

Parameters:

Name Type Description Default
files Files

List of files to fix.

required
dry_run Optional[bool]

Whether to write changes or not.

None
config Optional[YamlfixConfig]

Small set of user provided configuration options for yamlfix.

None

Returns:

Type Description
Union[Optional[str], Tuple[Optional[str], bool]]

A tuple with the following items:

Union[Optional[str], Tuple[Optional[str], bool]]
  • Fixed code or None.
Union[Optional[str], Tuple[Optional[str], bool]]
  • A bool to indicate whether at least one file has been changed.
Source code in yamlfix/services.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def fix_files(  # pylint: disable=too-many-branches
    files: Files, dry_run: Optional[bool] = None, config: Optional[YamlfixConfig] = None
) -> Union[Optional[str], Tuple[Optional[str], bool]]:  # noqa: TAE002
    """Fix the yaml source code of a list of files.

    If the input is taken from stdin, it will return the fixed value.

    Args:
        files: List of files to fix.
        dry_run: Whether to write changes or not.
        config: Small set of user provided configuration options for yamlfix.

    Returns:
        A tuple with the following items:
        * Fixed code or None.
        * A bool to indicate whether at least one file has been changed.
    """
    changed = False

    if dry_run is None:
        warnings.warn(
            """
            From 2023-01-12 fix_files will change the return type from
            `Optional[str]` to Tuple[Optional[str], bool], where the first
            element of the Tuple is the fixed source and the second a bool that
            returns whether the source has changed.

            For more information check https://github.com/lyz-code/yamlfix/pull/182
            """,
            UserWarning,
            stacklevel=2,
        )

    total_fixed = 0

    for file_ in files:
        if isinstance(file_, str):
            with open(file_, "r", encoding="utf-8") as file_descriptor:
                source = file_descriptor.read()
                file_name = file_
        else:
            source = file_.read()
            file_name = file_.name

        log.debug("Fixing file %s...", file_name)
        fixed_source = fix_code(source, config)

        if fixed_source != source:
            changed = True
            if dry_run:
                log.info("Would fix %s", file_name)
            else:
                log.info("Fixed %s", file_name)
                total_fixed += 1
        else:
            log.log(15, "%s is already well formatted", file_name)

        if file_name == "<stdin>":
            if dry_run is None:
                return fixed_source
            return fixed_source, changed

        if fixed_source != source:
            if dry_run:
                continue
            if isinstance(file_, str):
                with open(file_, "w", encoding="utf-8") as file_descriptor:
                    file_descriptor.write(fixed_source)
            else:
                file_.seek(0)
                file_.write(fixed_source)
                file_.truncate()
    log.info(
        "Checked %d files: %d fixed, %d left unchanged",
        len(files),
        total_fixed,
        len(files) - total_fixed,
    )

    if dry_run is None:
        return None

    return None, changed

Utilities to retrieve the information of the program version.

version_info()

Display the version of the program, python and the platform.

Source code in yamlfix/version.py
11
12
13
14
15
16
17
18
19
20
def version_info() -> str:
    """Display the version of the program, python and the platform."""
    return dedent(
        f"""\
        ------------------------------------------------------------------
             yamlfix: {__version__}
             Python: {sys.version.split(" ", maxsplit=1)[0]}
             Platform: {platform.platform()}
        ------------------------------------------------------------------"""
    )

Last update: 2022-12-18