Skip to content

repository_orm

Library to ease the implementation of the repository pattern in Python projects.

AutoIncrementError

Bases: Exception

Raised when the id_ auto increment repository feature fails.

Source code in repository_orm/exceptions.py
12
13
class AutoIncrementError(Exception):
    """Raised when the id_ auto increment repository feature fails."""

Entity

Bases: BaseModel

Model of any object no defined by it's attributes whom instead has an identity.

Unlike value objects, they have identity equality. We can change their values, and they are still recognizably the same thing.

An entity with a negative id means that the id needs to be set by the repository.

The _defined_values are used to know which attributes were set by the user at the time of merging objects.

Source code in repository_orm/model.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class Entity(BaseModel):
    """Model of any object no defined by it's attributes whom instead has an identity.

    Unlike value objects, they have *identity equality*. We can change their values, and
    they are still recognizably the same thing.

    An entity with a negative id means that the id needs to be set by the repository.

    The _defined_values are used to know which attributes were set by the user at the
    time of merging objects.
    """

    id_: EntityID = -1
    _defined_values: Dict[str, Any] = PrivateAttr()
    _skip_on_merge: List[str] = []

    # ANN401: Any not allowed, but it's what we have.
    def __init__(self, **data: Any) -> None:  # noqa: ANN401
        """Initialize the defined values."""
        super().__init__(**data)
        self._defined_values = data

    def __lt__(self, other: "Entity") -> bool:
        """Assert if an object is smaller than us.

        Args:
            other: Entity to compare.
        """
        if isinstance(other.id_, int) and isinstance(self.id_, int):
            return self.id_ < other.id_
        return str(self.id_) < str(other.id_)

    def __gt__(self, other: "Entity") -> bool:
        """Assert if an object is greater than us.

        Args:
            other: Entity to compare.
        """
        if isinstance(other.id_, int) and isinstance(self.id_, int):
            return self.id_ > other.id_
        return str(self.id_) > str(other.id_)

    def __hash__(self) -> int:
        """Create an unique hash of the class object."""
        return hash(f"{self.model_name}-{self.id_}")

    # ANN401: Any not allowed, but it's what we have.
    def __setattr__(self, attribute: str, value: Any) -> None:  # noqa: ANN401
        """Store the set attribute into the _defined_values."""
        if attribute != "_defined_values":
            self._defined_values[attribute] = value
        super().__setattr__(attribute, value)

    @property
    def model_name(self) -> str:
        """Return the entity model name."""
        return self.schema()["title"]

    def merge(self, other: "Entity") -> "Entity":
        """Update the attributes with the ones manually set by the user of other.

        If the other object has default values not set by the user, they won't be
        propagated to `self`.

        Args:
            other: Entity to compare.
        """
        if not isinstance(other, type(self)):
            raise ValueError(
                "Can't merge objects of different models "
                f"({self.model_name} with {other.model_name})."
            )
        if self.id_ != other.id_:
            raise ValueError(f"Can't merge two {self.model_name}s with different ids")

        # Merge objects
        # W0212: access to an internal property, but it's managed by us so there is
        # no problem on it.
        for attribute, value in other._defined_values.items():  # noqa: W0212
            if attribute not in self._skip_on_merge:
                setattr(self, attribute, value)

        return self

    @property
    def defined_values(self) -> Dict[str, Any]:
        """Return the entity defined values."""
        return self._defined_values

    def clear_defined_values(self) -> None:
        """Remove all references to defined values.

        I tried to return self so that it can be used chained with repo.get(), but I get
        a mypy error `Incompatible return value type (got "Entity", expected "Entity")`
        """
        self._defined_values = {}

__gt__(other)

Assert if an object is greater than us.

Parameters:

Name Type Description Default
other Entity

Entity to compare.

required
Source code in repository_orm/model.py
46
47
48
49
50
51
52
53
54
def __gt__(self, other: "Entity") -> bool:
    """Assert if an object is greater than us.

    Args:
        other: Entity to compare.
    """
    if isinstance(other.id_, int) and isinstance(self.id_, int):
        return self.id_ > other.id_
    return str(self.id_) > str(other.id_)

__hash__()

Create an unique hash of the class object.

Source code in repository_orm/model.py
56
57
58
def __hash__(self) -> int:
    """Create an unique hash of the class object."""
    return hash(f"{self.model_name}-{self.id_}")

__init__(**data)

Initialize the defined values.

Source code in repository_orm/model.py
31
32
33
34
def __init__(self, **data: Any) -> None:  # noqa: ANN401
    """Initialize the defined values."""
    super().__init__(**data)
    self._defined_values = data

__lt__(other)

Assert if an object is smaller than us.

Parameters:

Name Type Description Default
other Entity

Entity to compare.

required
Source code in repository_orm/model.py
36
37
38
39
40
41
42
43
44
def __lt__(self, other: "Entity") -> bool:
    """Assert if an object is smaller than us.

    Args:
        other: Entity to compare.
    """
    if isinstance(other.id_, int) and isinstance(self.id_, int):
        return self.id_ < other.id_
    return str(self.id_) < str(other.id_)

__setattr__(attribute, value)

Store the set attribute into the _defined_values.

Source code in repository_orm/model.py
61
62
63
64
65
def __setattr__(self, attribute: str, value: Any) -> None:  # noqa: ANN401
    """Store the set attribute into the _defined_values."""
    if attribute != "_defined_values":
        self._defined_values[attribute] = value
    super().__setattr__(attribute, value)

clear_defined_values()

Remove all references to defined values.

I tried to return self so that it can be used chained with repo.get(), but I get a mypy error Incompatible return value type (got "Entity", expected "Entity")

Source code in repository_orm/model.py
103
104
105
106
107
108
109
def clear_defined_values(self) -> None:
    """Remove all references to defined values.

    I tried to return self so that it can be used chained with repo.get(), but I get
    a mypy error `Incompatible return value type (got "Entity", expected "Entity")`
    """
    self._defined_values = {}

defined_values() property

Return the entity defined values.

Source code in repository_orm/model.py
 98
 99
100
101
@property
def defined_values(self) -> Dict[str, Any]:
    """Return the entity defined values."""
    return self._defined_values

merge(other)

Update the attributes with the ones manually set by the user of other.

If the other object has default values not set by the user, they won't be propagated to self.

Parameters:

Name Type Description Default
other Entity

Entity to compare.

required
Source code in repository_orm/model.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def merge(self, other: "Entity") -> "Entity":
    """Update the attributes with the ones manually set by the user of other.

    If the other object has default values not set by the user, they won't be
    propagated to `self`.

    Args:
        other: Entity to compare.
    """
    if not isinstance(other, type(self)):
        raise ValueError(
            "Can't merge objects of different models "
            f"({self.model_name} with {other.model_name})."
        )
    if self.id_ != other.id_:
        raise ValueError(f"Can't merge two {self.model_name}s with different ids")

    # Merge objects
    # W0212: access to an internal property, but it's managed by us so there is
    # no problem on it.
    for attribute, value in other._defined_values.items():  # noqa: W0212
        if attribute not in self._skip_on_merge:
            setattr(self, attribute, value)

    return self

model_name() property

Return the entity model name.

Source code in repository_orm/model.py
67
68
69
70
@property
def model_name(self) -> str:
    """Return the entity model name."""
    return self.schema()["title"]

EntityNotFoundError

Bases: Exception

Raised when the search or retrieve of an entity fails.

Source code in repository_orm/exceptions.py
4
5
class EntityNotFoundError(Exception):
    """Raised when the search or retrieve of an entity fails."""

FakeRepository

Bases: Repository

Implement the repository pattern using a memory dictionary.

Source code in repository_orm/adapters/data/fake.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class FakeRepository(Repository):
    """Implement the repository pattern using a memory dictionary."""

    def __init__(
        self,
        database_url: str = "",
    ) -> None:
        """Initialize the repository attributes."""
        super().__init__()
        if database_url == "/inexistent_dir/database.db":
            raise ConnectionError(f"Could not create database file: {database_url}")
        # ignore: Type variable "repository_orm.adapters.data.fake.Entity" is unbound
        # I don't know how to fix this
        self.entities: FakeRepositoryDB[EntityT] = {}  # type: ignore
        self.new_entities: FakeRepositoryDB[EntityT] = {}  # type: ignore
        self.is_connection_closed = False

    def _add(self, entity: EntityT) -> EntityT:
        """Append an entity to the repository.

        Args:
            entity: Entity to add to the repository.

        Returns:
            entity
        """
        if self.new_entities == {}:
            self.new_entities = copy.deepcopy(self.entities.copy())
        try:
            self.new_entities[type(entity)]
        except KeyError:
            self.new_entities[type(entity)] = {}

        self.new_entities[type(entity)][entity.id_] = entity

        return entity

    def delete(self, entity: EntityT) -> None:
        """Delete an entity from the repository.

        Args:
            entity: Entity to remove from the repository.

        Raises:
            EntityNotFoundError: If the entity is not found.
        """
        if self.new_entities == {}:
            self.new_entities = copy.deepcopy(self.entities.copy())
        try:
            self.new_entities[type(entity)].pop(entity.id_, None)
        except KeyError as error:
            raise EntityNotFoundError(
                f"Unable to delete entity {entity} because it's not in the repository"
            ) from error

    def _get(
        self,
        value: EntityID,
        model: Type[EntityT],
        attribute: str = "id_",
    ) -> List[EntityT]:
        """Obtain all entities from the repository that match an id_.

        If the attribute argument is passed, check that attribute instead.

        Args:
            value: Value of the entity attribute to obtain.
            model: Entity class to obtain.
            attribute: Entity attribute to check.

        Returns:
            entities: All entities that match the criteria.
        """
        matching_entities = []

        if attribute == "id_":
            with suppress(KeyError):
                matching_entities.append(self.entities[model][value])
        else:
            matching_entities = self._search({attribute: value}, model)

        return copy.deepcopy(matching_entities)

    def _all(self, model: Type[EntityT]) -> List[EntityT]:
        """Get all the entities from the repository that match a model.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
        """
        entities = []

        with suppress(KeyError):
            entities += sorted(
                entity for entity_id, entity in self.entities[model].items()
            )

        return entities

    def commit(self) -> None:
        """Persist the changes into the repository."""
        for model, entities in self.new_entities.items():
            self.entities[model] = entities
        self.new_entities = {}

    def _search(
        self,
        fields: Dict[str, EntityID],
        model: Type[EntityT],
    ) -> List[EntityT]:
        """Get the entities whose attributes match one or several conditions.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
            fields: Dictionary with the {key}:{value} to search.

        Returns:
            entities: List of Entity object that matches the search criteria.
        """
        all_entities = self.all(model)
        entities_dict = {entity.id_: entity for entity in all_entities}
        entity_attributes = {entity.id_: entity.dict() for entity in all_entities}

        for key, value in fields.items():
            # Get entities that have the value `value`
            entities_with_value = entity_attributes | grep(
                value, use_regexp=True, strict_checking=False
            )
            matching_entity_attributes = {}

            try:
                entities_with_value["matched_values"]
            except KeyError:
                return []

            for path in entities_with_value["matched_values"]:
                entity_id = re.sub(r"root\['?(.*?)'?\]\[.*", r"\1", path)

                # Convert int ids from str to int
                try:
                    # ignore: waiting for ADR-006 to be resolved
                    entity_id = int(entity_id)  # type: ignore
                except ValueError:
                    entity_id = re.sub(r"'(.*)'", r"\1", entity_id)

                # Add the entity to the matching ones only if the value is of the
                # attribute `key`.
                if re.match(rf"root\['?{entity_id}'?\]\['{key}'\]", path):
                    matching_entity_attributes[entity_id] = extract(
                        entity_attributes, f"root[{entity_id}]"
                    )
            # ignore: waiting for ADR-006 to be resolved
            entity_attributes = matching_entity_attributes  # type: ignore
        entities = [entities_dict[key] for key in entity_attributes.keys()]

        return entities

    def apply_migrations(self, migrations_directory: str) -> None:
        """Run the migrations of the repository schema.

        Args:
            migrations_directory: path to the directory containing the migration
                scripts.
        """
        # The fake repository doesn't have any schema

    def last(
        self,
        model: Type[EntityT],
    ) -> EntityT:
        """Get the biggest entity from the repository.

        Args:
            model: Entity class to obtain.

        Returns:
            entity: Biggest Entity object that matches a model.

        Raises:
            EntityNotFoundError: If there are no entities.
        """
        try:
            last_index_entity = super().last(model)
        except EntityNotFoundError as empty_repo:
            try:
                # Empty repo but entities staged to be commited.
                return max(self._staged_entities(model))
            except KeyError as no_staged_entities:
                # Empty repo and no entities staged.
                raise empty_repo from no_staged_entities

        try:
            last_staged_entity = max(self._staged_entities(model))
        except KeyError:
            # Full repo and no staged entities.
            return last_index_entity

        # Full repo and staged entities.
        return max([last_index_entity, last_staged_entity])

    def _staged_entities(self, model: Type[EntityT]) -> List[EntityT]:
        """Return a list of staged entities of a model type.

        Args:
            model: Return only instances of this model.
        """
        return [entity for _, entity in self.new_entities[model].items()]

    def close(self) -> None:
        """Close the connection to the database."""
        self.is_connection_closed = True

    @property
    def is_closed(self) -> bool:
        """Inform if the connection is closed."""
        return self.is_connection_closed

    def empty(self) -> None:
        """Remove all entities from the repository."""
        self.entities = {}
        self.new_entities = {}

__init__(database_url='')

Initialize the repository attributes.

Source code in repository_orm/adapters/data/fake.py
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(
    self,
    database_url: str = "",
) -> None:
    """Initialize the repository attributes."""
    super().__init__()
    if database_url == "/inexistent_dir/database.db":
        raise ConnectionError(f"Could not create database file: {database_url}")
    # ignore: Type variable "repository_orm.adapters.data.fake.Entity" is unbound
    # I don't know how to fix this
    self.entities: FakeRepositoryDB[EntityT] = {}  # type: ignore
    self.new_entities: FakeRepositoryDB[EntityT] = {}  # type: ignore
    self.is_connection_closed = False

apply_migrations(migrations_directory)

Run the migrations of the repository schema.

Parameters:

Name Type Description Default
migrations_directory str

path to the directory containing the migration scripts.

required
Source code in repository_orm/adapters/data/fake.py
178
179
180
181
182
183
184
def apply_migrations(self, migrations_directory: str) -> None:
    """Run the migrations of the repository schema.

    Args:
        migrations_directory: path to the directory containing the migration
            scripts.
    """

close()

Close the connection to the database.

Source code in repository_orm/adapters/data/fake.py
229
230
231
def close(self) -> None:
    """Close the connection to the database."""
    self.is_connection_closed = True

commit()

Persist the changes into the repository.

Source code in repository_orm/adapters/data/fake.py
118
119
120
121
122
def commit(self) -> None:
    """Persist the changes into the repository."""
    for model, entities in self.new_entities.items():
        self.entities[model] = entities
    self.new_entities = {}

delete(entity)

Delete an entity from the repository.

Parameters:

Name Type Description Default
entity EntityT

Entity to remove from the repository.

required

Raises:

Type Description
EntityNotFoundError

If the entity is not found.

Source code in repository_orm/adapters/data/fake.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def delete(self, entity: EntityT) -> None:
    """Delete an entity from the repository.

    Args:
        entity: Entity to remove from the repository.

    Raises:
        EntityNotFoundError: If the entity is not found.
    """
    if self.new_entities == {}:
        self.new_entities = copy.deepcopy(self.entities.copy())
    try:
        self.new_entities[type(entity)].pop(entity.id_, None)
    except KeyError as error:
        raise EntityNotFoundError(
            f"Unable to delete entity {entity} because it's not in the repository"
        ) from error

empty()

Remove all entities from the repository.

Source code in repository_orm/adapters/data/fake.py
238
239
240
241
def empty(self) -> None:
    """Remove all entities from the repository."""
    self.entities = {}
    self.new_entities = {}

is_closed() property

Inform if the connection is closed.

Source code in repository_orm/adapters/data/fake.py
233
234
235
236
@property
def is_closed(self) -> bool:
    """Inform if the connection is closed."""
    return self.is_connection_closed

last(model)

Get the biggest entity from the repository.

Parameters:

Name Type Description Default
model Type[EntityT]

Entity class to obtain.

required

Returns:

Name Type Description
entity EntityT

Biggest Entity object that matches a model.

Raises:

Type Description
EntityNotFoundError

If there are no entities.

Source code in repository_orm/adapters/data/fake.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def last(
    self,
    model: Type[EntityT],
) -> EntityT:
    """Get the biggest entity from the repository.

    Args:
        model: Entity class to obtain.

    Returns:
        entity: Biggest Entity object that matches a model.

    Raises:
        EntityNotFoundError: If there are no entities.
    """
    try:
        last_index_entity = super().last(model)
    except EntityNotFoundError as empty_repo:
        try:
            # Empty repo but entities staged to be commited.
            return max(self._staged_entities(model))
        except KeyError as no_staged_entities:
            # Empty repo and no entities staged.
            raise empty_repo from no_staged_entities

    try:
        last_staged_entity = max(self._staged_entities(model))
    except KeyError:
        # Full repo and no staged entities.
        return last_index_entity

    # Full repo and staged entities.
    return max([last_index_entity, last_staged_entity])

File

Bases: Entity, Generic[AnyStr]

Model a computer file.

Source code in repository_orm/model.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class File(Entity, Generic[AnyStr]):
    """Model a computer file."""

    path: str
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None
    owner: Optional[str] = None
    group: Optional[str] = None
    permissions: Optional[str] = None

    # The use of a private attribute and the impossibility of loading the content
    # at object creation will be fixed on Pydantic 1.9.
    # We will be able to define the excluded attribute content in the Config of the
    # model.
    #
    # For more information on how to improve this code, read this:
    # https://lyz-code.github.io/blue-book/coding/python/pydantic/#define-fields-to-exclude-from-exporting-at-config-level # noqa:E501
    _content: Optional[AnyStr] = PrivateAttr(None)
    # If the content is of type bytes
    is_bytes: bool = False

    @property
    def basename(self) -> str:
        """Return the name of the file."""
        return os.path.basename(self.path)

    @property
    def dirname(self) -> str:
        """Return the name of the file."""
        return os.path.dirname(self.path)

    @property
    def extension(self) -> str:
        """Return the name of the file."""
        return self.basename.split(".")[-1]

    @property
    def content(self) -> AnyStr:
        """Return the content of the file.

        Returns:
            The content of the file.

        Raises:
            FileContentNotLoadedError: if the content is not yet loaded.
        """
        if self._content is None:
            raise FileContentNotLoadedError(
                "The content of the file has not been loaded yet."
            )
        return self._content

basename() property

Return the name of the file.

Source code in repository_orm/model.py
137
138
139
140
@property
def basename(self) -> str:
    """Return the name of the file."""
    return os.path.basename(self.path)

content() property

Return the content of the file.

Returns:

Type Description
AnyStr

The content of the file.

Raises:

Type Description
FileContentNotLoadedError

if the content is not yet loaded.

Source code in repository_orm/model.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@property
def content(self) -> AnyStr:
    """Return the content of the file.

    Returns:
        The content of the file.

    Raises:
        FileContentNotLoadedError: if the content is not yet loaded.
    """
    if self._content is None:
        raise FileContentNotLoadedError(
            "The content of the file has not been loaded yet."
        )
    return self._content

dirname() property

Return the name of the file.

Source code in repository_orm/model.py
142
143
144
145
@property
def dirname(self) -> str:
    """Return the name of the file."""
    return os.path.dirname(self.path)

extension() property

Return the name of the file.

Source code in repository_orm/model.py
147
148
149
150
@property
def extension(self) -> str:
    """Return the name of the file."""
    return self.basename.split(".")[-1]

LocalFileRepository

Bases: FileRepository[AnyStr]

Define the local filesystem adapter.

Source code in repository_orm/adapters/file/local_file.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class LocalFileRepository(FileRepository[AnyStr]):
    """Define the local filesystem adapter."""

    def __init__(self, workdir: str) -> None:
        """Initialize the object.

        Creates the working directory if it doesn't exist.
        """
        if not os.path.exists(workdir):
            os.makedirs(workdir)
        super().__init__(workdir=workdir)

    def load(self, file_: File[AnyStr]) -> File[AnyStr]:
        """Load the content of the file from the persistence system."""
        log.debug(f"Loading content of file {file_.path}")
        file_ = self.fix_path(file_)
        if file_.is_bytes:
            mode = "rb"
            encoding = None
        else:
            mode = "r"
            encoding = "utf-8"

        with open(
            os.path.expanduser(file_.path), mode, encoding=encoding
        ) as file_descriptor:
            # W0212: Access to private attribute, but it's managed by us so it's OK
            file_._content = file_descriptor.read()  # noqa: W0212
        return file_

    def save(self, file_: File[AnyStr]) -> File[AnyStr]:
        """Save the content of the file into the persistence system."""
        log.debug(f"Saving the content of file {file_.path}")
        file_ = self.fix_path(file_)
        if file_.is_bytes:
            mode = "wb+"
        else:
            mode = "w+"

        with open(os.path.expanduser(file_.path), mode) as file_descriptor:
            file_descriptor.write(file_.content)

        return file_

    def delete(self, file_: File[AnyStr]) -> None:
        """Delete the file from the persistence system."""
        log.debug(f"Deleting the content of file {file_.path}")
        try:
            os.remove(os.path.expanduser(file_.path))
        except FileNotFoundError:
            log.warning(
                f"Can't remove the file {file_.path} as it doesn't exist "
                "in the file repository."
            )

__init__(workdir)

Initialize the object.

Creates the working directory if it doesn't exist.

Source code in repository_orm/adapters/file/local_file.py
16
17
18
19
20
21
22
23
def __init__(self, workdir: str) -> None:
    """Initialize the object.

    Creates the working directory if it doesn't exist.
    """
    if not os.path.exists(workdir):
        os.makedirs(workdir)
    super().__init__(workdir=workdir)

delete(file_)

Delete the file from the persistence system.

Source code in repository_orm/adapters/file/local_file.py
57
58
59
60
61
62
63
64
65
66
def delete(self, file_: File[AnyStr]) -> None:
    """Delete the file from the persistence system."""
    log.debug(f"Deleting the content of file {file_.path}")
    try:
        os.remove(os.path.expanduser(file_.path))
    except FileNotFoundError:
        log.warning(
            f"Can't remove the file {file_.path} as it doesn't exist "
            "in the file repository."
        )

load(file_)

Load the content of the file from the persistence system.

Source code in repository_orm/adapters/file/local_file.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def load(self, file_: File[AnyStr]) -> File[AnyStr]:
    """Load the content of the file from the persistence system."""
    log.debug(f"Loading content of file {file_.path}")
    file_ = self.fix_path(file_)
    if file_.is_bytes:
        mode = "rb"
        encoding = None
    else:
        mode = "r"
        encoding = "utf-8"

    with open(
        os.path.expanduser(file_.path), mode, encoding=encoding
    ) as file_descriptor:
        # W0212: Access to private attribute, but it's managed by us so it's OK
        file_._content = file_descriptor.read()  # noqa: W0212
    return file_

save(file_)

Save the content of the file into the persistence system.

Source code in repository_orm/adapters/file/local_file.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def save(self, file_: File[AnyStr]) -> File[AnyStr]:
    """Save the content of the file into the persistence system."""
    log.debug(f"Saving the content of file {file_.path}")
    file_ = self.fix_path(file_)
    if file_.is_bytes:
        mode = "wb+"
    else:
        mode = "w+"

    with open(os.path.expanduser(file_.path), mode) as file_descriptor:
        file_descriptor.write(file_.content)

    return file_

PypikaRepository

Bases: Repository

Implement the repository pattern using the Pypika query builder.

Source code in repository_orm/adapters/data/pypika.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class PypikaRepository(Repository):
    """Implement the repository pattern using the Pypika query builder."""

    def __init__(
        self,
        database_url: str = "",
    ) -> None:
        """Initialize the repository attributes.

        Args:
            database_url: URL specifying the connection to the database.
        """
        super().__init__(database_url)
        database_file = database_url.replace("sqlite:///", "")
        if not os.path.isfile(database_file):
            try:
                with open(database_file, "a", encoding="utf-8") as file_cursor:
                    file_cursor.close()
            except FileNotFoundError as error:
                raise ConnectionError(
                    f"Could not create the database file: {database_file}"
                ) from error
        self.connection = sqlite3.connect(database_file)
        self.connection.create_function("REGEXP", 2, _regexp)
        self.cursor = self.connection.cursor()

    def _execute(self, query: Union[Query, str]) -> sqlite3.Cursor:
        """Execute an SQL statement from a Pypika query object.

        Args:
            query: Pypika query
        """
        return self.cursor.execute(str(query))

    @staticmethod
    def _table(entity: EntityT) -> Table:
        """Return the table of the selected entity object."""
        return Table(entity.model_name.lower())

    @staticmethod
    def _table_model(model: Type[EntityT]) -> Table:
        """Return the table of the selected entity class."""
        return Table(model.__name__.lower())

    def _add(self, entity: EntityT) -> EntityT:
        """Append an entity to the repository.

        If the id is not set, autoincrement the last.

        Args:
            entity: Entity to add to the repository.

        Returns:
            entity
        """
        table = self._table(entity)
        columns = list(entity.dict().keys())
        columns[columns.index("id_")] = "id"
        values = [value for key, value in entity.dict().items()]
        insert_query = Query.into(table).columns(tuple(columns)).insert(tuple(values))
        # Until https://github.com/kayak/pypika/issues/535 is solved we need to write
        # The upsert statement ourselves.
        # nosec: B608:hardcoded_sql_expressions, Possible SQL injection vector through
        #   string-based query construction. We're not letting the user define the
        #   values of the query, the only variable inputs are the keys, that are
        #   defined by the developer, so it's not probable that he chooses an
        #   entity attributes that are an SQL injection. Once the #535 issue is
        #   solved, we should get rid of this error too.
        upsert_query = (
            str(insert_query)
            + " ON CONFLICT(id) DO UPDATE SET "  # nosec
            + ", ".join([f"{key}=excluded.{key}" for key in columns])
        )
        self._execute(upsert_query)

        return entity

    def delete(self, entity: EntityT) -> None:
        """Delete an entity from the repository.

        Args:
            entity: Entity to remove from the repository.

        Raises:
            EntityNotFoundError: If the entity is not found.
        """
        table = self._table(entity)
        try:
            self.get(entity.id_, type(entity))
        except EntityNotFoundError as error:
            raise EntityNotFoundError(
                f"Unable to delete entity {entity} because it's not in the repository"
            ) from error
        query = Query.from_(table).delete().where(table.id == entity.id_)
        self._execute(query)

    def _get(
        self,
        value: EntityID,
        model: Type[EntityT],
        attribute: str = "id_",
    ) -> List[EntityT]:
        """Obtain all entities from the repository that match an id_.

        If the attribute argument is passed, check that attribute instead.

        Args:
            value: Value of the entity attribute to obtain.
            model: Entity class to obtain.
            attribute: Entity attribute to check.

        Returns:
            entities: All entities that match the criteria.
        """
        table = self._table_model(model)
        query = Query.from_(table).select("*")
        if attribute == "id_":
            query = query.where(table.id == value)
        else:
            query = query.where(getattr(table, attribute) == value)

        return self._build_entities(model, query)

    def _all(self, model: Type[EntityT]) -> List[EntityT]:
        """Get all the entities from the repository that match a model.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
        """
        table = self._table_model(model)
        query = Query.from_(table).select("*")
        return self._build_entities(model, query)

    def _build_entities(self, model: Type[EntityT], query: Query) -> List[EntityT]:
        """Build Entity objects from the data extracted from the database.

        Args:
            model: Entity class model to build.
            query: pypika query of the entities you want to build
        """
        cursor = self._execute(query)

        entities_data = cursor.fetchall()
        attributes = [description[0] for description in cursor.description]

        entities = []
        for entity_data in entities_data:
            entity_dict = {
                attributes[index]: entity_data[index]
                for index in range(0, len(entity_data))
            }
            entity_dict["id_"] = entity_dict.pop("id")

            entities.append(model(**entity_dict))
        return entities

    def commit(self) -> None:
        """Persist the changes into the repository."""
        self.connection.commit()

    def _search(
        self,
        fields: Dict[str, EntityID],
        model: Type[EntityT],
    ) -> List[EntityT]:
        """Get the entities whose attributes match one or several conditions.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
            fields: Dictionary with the {key}:{value} to search.

        Returns:
            entities: List of Entity object that matches the search criteria.
        """
        table = self._table_model(model)
        query = Query.from_(table).select("*")

        for key, value in fields.items():
            if key == "id_":
                key = "id"
            if isinstance(value, str):
                query = query.where(
                    functions.Lower(getattr(table, key)).regexp(value.lower())
                )
            else:
                query = query.where(getattr(table, key) == value)

        return self._build_entities(model, query)

    def apply_migrations(self, migrations_directory: str) -> None:
        """Run the migrations of the repository schema.

        Args:
            migrations_directory: path to the directory containing the migration
                scripts.
        """
        backend = get_backend(self.database_url)
        migrations = read_migrations(migrations_directory)

        with backend.lock():
            log.info("Running database migrations")
            try:
                backend.apply_migrations(backend.to_apply(migrations))
            except Exception as error:  # noqa: W0703
                # We need to add tests for this function and use a less generic
                # exception
                log.error("Error running database migrations")
                log.error(error)

                log.debug("Rolling back the database migrations")
                try:
                    backend.rollback_migrations(backend.to_rollback(migrations))
                except Exception as rollback_error:  # noqa: W0703
                    # We need to add tests for this function and use a less generic
                    # exception
                    log.error("Error rolling back database migrations")
                    log.error(rollback_error)
                    raise rollback_error from error
            log.debug("Complete running database migrations")

    def close(self) -> None:
        """Close the connection to the database."""
        self.connection.close()

    def empty(self) -> None:
        """Remove all entities from the repository."""
        for table in self.tables:
            self._execute(Query.from_(table).delete())

    @property
    def tables(self) -> List[str]:
        """Return the entity tables of the database."""
        if re.match("sqlite://", self.database_url):
            query = "SELECT name FROM sqlite_master WHERE type='table'"

        tables = [
            table[0]
            for table in self._execute(query).fetchall()
            if not re.match(r"^_", table[0]) and not re.match("yoyo", table[0])
        ]
        return tables

    @property
    def is_closed(self) -> bool:
        """Inform if the connection is closed."""
        try:
            self.connection.cursor()
            return False
        except ProgrammingError:
            return True

__init__(database_url='')

Initialize the repository attributes.

Parameters:

Name Type Description Default
database_url str

URL specifying the connection to the database.

''
Source code in repository_orm/adapters/data/pypika.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(
    self,
    database_url: str = "",
) -> None:
    """Initialize the repository attributes.

    Args:
        database_url: URL specifying the connection to the database.
    """
    super().__init__(database_url)
    database_file = database_url.replace("sqlite:///", "")
    if not os.path.isfile(database_file):
        try:
            with open(database_file, "a", encoding="utf-8") as file_cursor:
                file_cursor.close()
        except FileNotFoundError as error:
            raise ConnectionError(
                f"Could not create the database file: {database_file}"
            ) from error
    self.connection = sqlite3.connect(database_file)
    self.connection.create_function("REGEXP", 2, _regexp)
    self.cursor = self.connection.cursor()

apply_migrations(migrations_directory)

Run the migrations of the repository schema.

Parameters:

Name Type Description Default
migrations_directory str

path to the directory containing the migration scripts.

required
Source code in repository_orm/adapters/data/pypika.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def apply_migrations(self, migrations_directory: str) -> None:
    """Run the migrations of the repository schema.

    Args:
        migrations_directory: path to the directory containing the migration
            scripts.
    """
    backend = get_backend(self.database_url)
    migrations = read_migrations(migrations_directory)

    with backend.lock():
        log.info("Running database migrations")
        try:
            backend.apply_migrations(backend.to_apply(migrations))
        except Exception as error:  # noqa: W0703
            # We need to add tests for this function and use a less generic
            # exception
            log.error("Error running database migrations")
            log.error(error)

            log.debug("Rolling back the database migrations")
            try:
                backend.rollback_migrations(backend.to_rollback(migrations))
            except Exception as rollback_error:  # noqa: W0703
                # We need to add tests for this function and use a less generic
                # exception
                log.error("Error rolling back database migrations")
                log.error(rollback_error)
                raise rollback_error from error
        log.debug("Complete running database migrations")

close()

Close the connection to the database.

Source code in repository_orm/adapters/data/pypika.py
258
259
260
def close(self) -> None:
    """Close the connection to the database."""
    self.connection.close()

commit()

Persist the changes into the repository.

Source code in repository_orm/adapters/data/pypika.py
192
193
194
def commit(self) -> None:
    """Persist the changes into the repository."""
    self.connection.commit()

delete(entity)

Delete an entity from the repository.

Parameters:

Name Type Description Default
entity EntityT

Entity to remove from the repository.

required

Raises:

Type Description
EntityNotFoundError

If the entity is not found.

Source code in repository_orm/adapters/data/pypika.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def delete(self, entity: EntityT) -> None:
    """Delete an entity from the repository.

    Args:
        entity: Entity to remove from the repository.

    Raises:
        EntityNotFoundError: If the entity is not found.
    """
    table = self._table(entity)
    try:
        self.get(entity.id_, type(entity))
    except EntityNotFoundError as error:
        raise EntityNotFoundError(
            f"Unable to delete entity {entity} because it's not in the repository"
        ) from error
    query = Query.from_(table).delete().where(table.id == entity.id_)
    self._execute(query)

empty()

Remove all entities from the repository.

Source code in repository_orm/adapters/data/pypika.py
262
263
264
265
def empty(self) -> None:
    """Remove all entities from the repository."""
    for table in self.tables:
        self._execute(Query.from_(table).delete())

is_closed() property

Inform if the connection is closed.

Source code in repository_orm/adapters/data/pypika.py
280
281
282
283
284
285
286
287
@property
def is_closed(self) -> bool:
    """Inform if the connection is closed."""
    try:
        self.connection.cursor()
        return False
    except ProgrammingError:
        return True

tables() property

Return the entity tables of the database.

Source code in repository_orm/adapters/data/pypika.py
267
268
269
270
271
272
273
274
275
276
277
278
@property
def tables(self) -> List[str]:
    """Return the entity tables of the database."""
    if re.match("sqlite://", self.database_url):
        query = "SELECT name FROM sqlite_master WHERE type='table'"

    tables = [
        table[0]
        for table in self._execute(query).fetchall()
        if not re.match(r"^_", table[0]) and not re.match("yoyo", table[0])
    ]
    return tables

Repository

Bases: abc.ABC

Gather common methods and define the interface of the repositories.

Attributes:

Name Type Description
database_url

URL specifying the connection to the database.

Source code in repository_orm/adapters/data/abstract.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class Repository(abc.ABC):
    """Gather common methods and define the interface of the repositories.

    Attributes:
        database_url: URL specifying the connection to the database.
    """

    @abc.abstractmethod
    def __init__(
        self,
        database_url: str = "",
    ) -> None:
        """Initialize the repository attributes.

        Args:
            database_url: URL specifying the connection to the database.
        """
        self.database_url = database_url
        self.cache = Cache()

    def add(
        self, entities: EntityOrEntitiesT, merge: bool = False
    ) -> EntityOrEntitiesT:
        """Append an entity or list of entities to the repository.

        If the id is not set, it will automatically increment the last available one.

        If `merge` is True, added entities will be merged with the existent ones in
        the cache.

        Args:
            entities: Entity or entities to add to the repository.

        Returns:
            entity or entities
        """
        if isinstance(entities, Entity):
            entity = entities

            if isinstance(entity.id_, int) and entity.id_ < 0:
                entity.id_ = self.next_id(entity)

            if merge:
                with suppress(EntityNotFoundError):
                    stored_entity = self.get(entity.id_, type(entity))
                    entity = stored_entity.merge(entity)

            if self.cache.entity_has_not_changed(entity):
                log.debug(
                    f"Skipping the addition of entity {entity} as it hasn't changed"
                )
                return entity
            entity = self._add(entity)
            self.cache.add(entity)
            return entity

        if isinstance(entities, list):
            updated_entities: List[Entity] = []
            for entity in entities:
                updated_entities.append(self.add(entity, merge))
            return updated_entities

        raise ValueError("Please add an entity or a list of entities")

    @abc.abstractmethod
    def _add(self, entity: EntityT) -> EntityT:
        """Append an entity to the repository.

        This method is specific to each database adapter.

        Args:
            entity: Entity to add to the repository.

        Returns:
            entity
        """
        raise NotImplementedError

    @abc.abstractmethod
    def delete(self, entity: EntityT) -> None:
        """Delete an entity from the repository.

        Args:
            entity: Entity to remove from the repository.
        """
        raise NotImplementedError

    def get(
        self,
        id_: EntityID,
        model: Type[EntityT],
        attribute: str = "id_",
    ) -> EntityT:
        """Obtain an entity from the repository by it's ID.

        Also save the entity in the cache

        Args:
            model: Entity class to obtain.
            id_: ID of the entity to obtain.

        Returns:
            entity: Entity object that matches the id_

        Raises:
            EntityNotFoundError: If the entity is not found.
            TooManyEntitiesError: If more than one entity was found.
        """
        entities = self._get(value=id_, model=model, attribute=attribute)

        if len(entities) > 1:
            raise TooManyEntitiesError(
                f"More than one entity was found with the {attribute} {id_}"
            )
        if len(entities) == 0:
            raise EntityNotFoundError(
                f"There are no entities of type {model.__name__} in the repository "
                f"with {attribute} {id_}."
            )

        entity = entities[0]
        entity.clear_defined_values()
        self.cache.add(entity)
        return entity

    @abc.abstractmethod
    def _get(
        self,
        value: EntityID,
        model: Type[EntityT],
        attribute: str = "id_",
    ) -> List[EntityT]:
        """Obtain all entities from the repository that match an id_.

        If the attribute argument is passed, check that attribute instead.

        Args:
            value: Value of the entity attribute to obtain.
            model: Entity class to obtain.
            attribute: Entity attribute to check.

        Returns:
            entities: All entities that match the criteria.
        """
        raise NotImplementedError

    def all(
        self,
        model: Type[EntityT],
    ) -> List[EntityT]:
        """Get all the entities from the repository that match a model.

        Also store the entities in the cache.

        Args:
            model: Entity class or classes to obtain.
        """
        entities = sorted(self._all(model))

        for entity in entities:
            entity.clear_defined_values()
            self.cache.add(entity)

        return entities

    @abc.abstractmethod
    def _all(self, model: Type[EntityT]) -> List[EntityT]:
        """Get all the entities from the repository that match a model.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def commit(self) -> None:
        """Persist the changes into the repository."""
        raise NotImplementedError

    def search(
        self,
        fields: Dict[str, EntityID],
        model: Type[EntityT],
    ) -> List[EntityT]:
        """Get the entities whose attributes match one or several conditions.

        Also add the found entities to the cache.

        Args:
            model: Entity class to obtain.
            fields: Dictionary with the {key}:{value} to search.

        Returns:
            entities: List of Entity object that matches the search criteria.
        """
        found_entities = sorted(self._search(fields, model))

        for entity in found_entities:
            entity.clear_defined_values()
            self.cache.add(entity)

        return found_entities

    @abc.abstractmethod
    def _search(
        self,
        fields: Dict[str, EntityID],
        model: Type[EntityT],
    ) -> List[EntityT]:
        """Get the entities whose attributes match one or several conditions.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
            fields: Dictionary with the {key}:{value} to search.

        Returns:
            entities: List of Entity object that matches the search criteria.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def apply_migrations(self, migrations_directory: str) -> None:
        """Run the migrations of the repository schema.

        Args:
            migrations_directory: path to the directory containing the migration
                scripts.
        """
        raise NotImplementedError

    def last(
        self,
        model: Type[EntityT],
    ) -> EntityT:
        """Get the biggest entity from the repository.

        Args:
            model: Entity class to obtain.

        Returns:
            entity: Biggest Entity object that matches a model.

        Raises:
            EntityNotFoundError: If there are no entities.
        """
        try:
            return max(self.all(model))
        except ValueError as error:
            raise EntityNotFoundError(
                f"There are no entities of type {model.__name__} in the repository."
            ) from error

    def first(
        self,
        model: Type[EntityT],
    ) -> EntityT:
        """Get the smallest entity from the repository.

        Args:
            model: Type of entity object to obtain.

        Returns:
            entity: Smallest Entity object that matches a model.

        Raises:
            EntityNotFoundError: If there are no entities.
        """
        try:
            return min(self.all(model))
        except ValueError as error:
            raise EntityNotFoundError(
                f"There are no entities of type {model.__name__} in the repository."
            ) from error

    def next_id(self, entity: EntityT) -> int:
        """Return one id unit more than the last entity id in the repository index.

        Args:
            entity: Entity whose model we want to get the next entity id.
        """
        try:
            last_id = self.last(type(entity)).id_
        except EntityNotFoundError:
            return 0
        if isinstance(last_id, int):
            return last_id + 1
        raise AutoIncrementError(
            "Auto increment is not yet supported for Entities with string id_s. "
            "Please set the id_ yourself before adding the entities to the "
            "repository."
        )

    @abc.abstractmethod
    def close(self) -> None:
        """Close the connection to the database."""
        raise NotImplementedError

    @property
    @abc.abstractmethod
    def is_closed(self) -> bool:
        """Inform if the connection is closed."""
        raise NotImplementedError

    @abc.abstractmethod
    def empty(self) -> None:
        """Remove all entities from the repository."""
        raise NotImplementedError

__init__(database_url='') abstractmethod

Initialize the repository attributes.

Parameters:

Name Type Description Default
database_url str

URL specifying the connection to the database.

''
Source code in repository_orm/adapters/data/abstract.py
24
25
26
27
28
29
30
31
32
33
34
35
@abc.abstractmethod
def __init__(
    self,
    database_url: str = "",
) -> None:
    """Initialize the repository attributes.

    Args:
        database_url: URL specifying the connection to the database.
    """
    self.database_url = database_url
    self.cache = Cache()

add(entities, merge=False)

Append an entity or list of entities to the repository.

If the id is not set, it will automatically increment the last available one.

If merge is True, added entities will be merged with the existent ones in the cache.

Parameters:

Name Type Description Default
entities EntityOrEntitiesT

Entity or entities to add to the repository.

required

Returns:

Type Description
EntityOrEntitiesT

entity or entities

Source code in repository_orm/adapters/data/abstract.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def add(
    self, entities: EntityOrEntitiesT, merge: bool = False
) -> EntityOrEntitiesT:
    """Append an entity or list of entities to the repository.

    If the id is not set, it will automatically increment the last available one.

    If `merge` is True, added entities will be merged with the existent ones in
    the cache.

    Args:
        entities: Entity or entities to add to the repository.

    Returns:
        entity or entities
    """
    if isinstance(entities, Entity):
        entity = entities

        if isinstance(entity.id_, int) and entity.id_ < 0:
            entity.id_ = self.next_id(entity)

        if merge:
            with suppress(EntityNotFoundError):
                stored_entity = self.get(entity.id_, type(entity))
                entity = stored_entity.merge(entity)

        if self.cache.entity_has_not_changed(entity):
            log.debug(
                f"Skipping the addition of entity {entity} as it hasn't changed"
            )
            return entity
        entity = self._add(entity)
        self.cache.add(entity)
        return entity

    if isinstance(entities, list):
        updated_entities: List[Entity] = []
        for entity in entities:
            updated_entities.append(self.add(entity, merge))
        return updated_entities

    raise ValueError("Please add an entity or a list of entities")

all(model)

Get all the entities from the repository that match a model.

Also store the entities in the cache.

Parameters:

Name Type Description Default
model Type[EntityT]

Entity class or classes to obtain.

required
Source code in repository_orm/adapters/data/abstract.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def all(
    self,
    model: Type[EntityT],
) -> List[EntityT]:
    """Get all the entities from the repository that match a model.

    Also store the entities in the cache.

    Args:
        model: Entity class or classes to obtain.
    """
    entities = sorted(self._all(model))

    for entity in entities:
        entity.clear_defined_values()
        self.cache.add(entity)

    return entities

apply_migrations(migrations_directory) abstractmethod

Run the migrations of the repository schema.

Parameters:

Name Type Description Default
migrations_directory str

path to the directory containing the migration scripts.

required
Source code in repository_orm/adapters/data/abstract.py
241
242
243
244
245
246
247
248
249
@abc.abstractmethod
def apply_migrations(self, migrations_directory: str) -> None:
    """Run the migrations of the repository schema.

    Args:
        migrations_directory: path to the directory containing the migration
            scripts.
    """
    raise NotImplementedError

close() abstractmethod

Close the connection to the database.

Source code in repository_orm/adapters/data/abstract.py
313
314
315
316
@abc.abstractmethod
def close(self) -> None:
    """Close the connection to the database."""
    raise NotImplementedError

commit() abstractmethod

Persist the changes into the repository.

Source code in repository_orm/adapters/data/abstract.py
193
194
195
196
@abc.abstractmethod
def commit(self) -> None:
    """Persist the changes into the repository."""
    raise NotImplementedError

delete(entity) abstractmethod

Delete an entity from the repository.

Parameters:

Name Type Description Default
entity EntityT

Entity to remove from the repository.

required
Source code in repository_orm/adapters/data/abstract.py
 95
 96
 97
 98
 99
100
101
102
@abc.abstractmethod
def delete(self, entity: EntityT) -> None:
    """Delete an entity from the repository.

    Args:
        entity: Entity to remove from the repository.
    """
    raise NotImplementedError

empty() abstractmethod

Remove all entities from the repository.

Source code in repository_orm/adapters/data/abstract.py
324
325
326
327
@abc.abstractmethod
def empty(self) -> None:
    """Remove all entities from the repository."""
    raise NotImplementedError

first(model)

Get the smallest entity from the repository.

Parameters:

Name Type Description Default
model Type[EntityT]

Type of entity object to obtain.

required

Returns:

Name Type Description
entity EntityT

Smallest Entity object that matches a model.

Raises:

Type Description
EntityNotFoundError

If there are no entities.

Source code in repository_orm/adapters/data/abstract.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def first(
    self,
    model: Type[EntityT],
) -> EntityT:
    """Get the smallest entity from the repository.

    Args:
        model: Type of entity object to obtain.

    Returns:
        entity: Smallest Entity object that matches a model.

    Raises:
        EntityNotFoundError: If there are no entities.
    """
    try:
        return min(self.all(model))
    except ValueError as error:
        raise EntityNotFoundError(
            f"There are no entities of type {model.__name__} in the repository."
        ) from error

get(id_, model, attribute='id_')

Obtain an entity from the repository by it's ID.

Also save the entity in the cache

Parameters:

Name Type Description Default
model Type[EntityT]

Entity class to obtain.

required
id_ EntityID

ID of the entity to obtain.

required

Returns:

Name Type Description
entity EntityT

Entity object that matches the id_

Raises:

Type Description
EntityNotFoundError

If the entity is not found.

TooManyEntitiesError

If more than one entity was found.

Source code in repository_orm/adapters/data/abstract.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def get(
    self,
    id_: EntityID,
    model: Type[EntityT],
    attribute: str = "id_",
) -> EntityT:
    """Obtain an entity from the repository by it's ID.

    Also save the entity in the cache

    Args:
        model: Entity class to obtain.
        id_: ID of the entity to obtain.

    Returns:
        entity: Entity object that matches the id_

    Raises:
        EntityNotFoundError: If the entity is not found.
        TooManyEntitiesError: If more than one entity was found.
    """
    entities = self._get(value=id_, model=model, attribute=attribute)

    if len(entities) > 1:
        raise TooManyEntitiesError(
            f"More than one entity was found with the {attribute} {id_}"
        )
    if len(entities) == 0:
        raise EntityNotFoundError(
            f"There are no entities of type {model.__name__} in the repository "
            f"with {attribute} {id_}."
        )

    entity = entities[0]
    entity.clear_defined_values()
    self.cache.add(entity)
    return entity

is_closed() abstractmethod property

Inform if the connection is closed.

Source code in repository_orm/adapters/data/abstract.py
318
319
320
321
322
@property
@abc.abstractmethod
def is_closed(self) -> bool:
    """Inform if the connection is closed."""
    raise NotImplementedError

last(model)

Get the biggest entity from the repository.

Parameters:

Name Type Description Default
model Type[EntityT]

Entity class to obtain.

required

Returns:

Name Type Description
entity EntityT

Biggest Entity object that matches a model.

Raises:

Type Description
EntityNotFoundError

If there are no entities.

Source code in repository_orm/adapters/data/abstract.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def last(
    self,
    model: Type[EntityT],
) -> EntityT:
    """Get the biggest entity from the repository.

    Args:
        model: Entity class to obtain.

    Returns:
        entity: Biggest Entity object that matches a model.

    Raises:
        EntityNotFoundError: If there are no entities.
    """
    try:
        return max(self.all(model))
    except ValueError as error:
        raise EntityNotFoundError(
            f"There are no entities of type {model.__name__} in the repository."
        ) from error

next_id(entity)

Return one id unit more than the last entity id in the repository index.

Parameters:

Name Type Description Default
entity EntityT

Entity whose model we want to get the next entity id.

required
Source code in repository_orm/adapters/data/abstract.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def next_id(self, entity: EntityT) -> int:
    """Return one id unit more than the last entity id in the repository index.

    Args:
        entity: Entity whose model we want to get the next entity id.
    """
    try:
        last_id = self.last(type(entity)).id_
    except EntityNotFoundError:
        return 0
    if isinstance(last_id, int):
        return last_id + 1
    raise AutoIncrementError(
        "Auto increment is not yet supported for Entities with string id_s. "
        "Please set the id_ yourself before adding the entities to the "
        "repository."
    )

search(fields, model)

Get the entities whose attributes match one or several conditions.

Also add the found entities to the cache.

Parameters:

Name Type Description Default
model Type[EntityT]

Entity class to obtain.

required
fields Dict[str, EntityID]

Dictionary with the {key}:{value} to search.

required

Returns:

Name Type Description
entities List[EntityT]

List of Entity object that matches the search criteria.

Source code in repository_orm/adapters/data/abstract.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def search(
    self,
    fields: Dict[str, EntityID],
    model: Type[EntityT],
) -> List[EntityT]:
    """Get the entities whose attributes match one or several conditions.

    Also add the found entities to the cache.

    Args:
        model: Entity class to obtain.
        fields: Dictionary with the {key}:{value} to search.

    Returns:
        entities: List of Entity object that matches the search criteria.
    """
    found_entities = sorted(self._search(fields, model))

    for entity in found_entities:
        entity.clear_defined_values()
        self.cache.add(entity)

    return found_entities

TinyDBRepository

Bases: Repository

Implement the repository pattern using the TinyDB.

Source code in repository_orm/adapters/data/tinydb.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
class TinyDBRepository(Repository):
    """Implement the repository pattern using the TinyDB."""

    def __init__(
        self,
        database_url: str = "",
    ) -> None:
        """Initialize the repository attributes.

        Args:
            database_url: URL specifying the connection to the database.
        """
        super().__init__(database_url)
        self.database_file = os.path.expanduser(database_url.replace("tinydb://", ""))
        if not os.path.isfile(self.database_file):
            try:
                with open(self.database_file, "a", encoding="utf-8") as file_cursor:
                    file_cursor.close()
            except FileNotFoundError as error:
                raise ConnectionError(
                    f"Could not create the database file: {self.database_file}"
                ) from error

        serialization = SerializationMiddleware(JSONStorage)
        serialization.register_serializer(DateTimeSerializer(), "TinyDate")

        self.db_ = TinyDB(
            self.database_file, storage=serialization, sort_keys=True, indent=4
        )
        self.staged: Dict[str, List[Any]] = {"add": [], "remove": []}

    def _add(self, entity: EntityT) -> EntityT:
        """Append an entity to the repository.

        If the id is not set, autoincrement the last.

        Args:
            entity: Entity to add to the repository.

        Returns:
            entity
        """
        self.staged["add"].append(entity)

        return entity

    def delete(self, entity: EntityT) -> None:
        """Delete an entity from the repository.

        Args:
            entity: Entity to remove from the repository.
        """
        try:
            self.get(entity.id_, type(entity))
        except EntityNotFoundError as error:
            raise EntityNotFoundError(
                f"Unable to delete entity {entity} because it's not in the repository"
            ) from error
        self.staged["remove"].append(entity)

    def _get(
        self,
        value: EntityID,
        model: Type[EntityT],
        attribute: str = "id_",
    ) -> List[EntityT]:
        """Obtain all entities from the repository that match an id_.

        If the attribute argument is passed, check that attribute instead.

        Args:
            value: Value of the entity attribute to obtain.
            model: Entity class to obtain.
            attribute: Entity attribute to check.

        Returns:
            entities: All entities that match the criteria.
        """
        model_query = Query().model_type_ == model.__name__.lower()

        matching_entities_data = self.db_.search(
            (Query()[attribute] == value) & (model_query)
        )

        return [
            self._build_entity(entity_data, model)
            for entity_data in matching_entities_data
        ]

    @staticmethod
    def _build_entity(
        entity_data: Dict[Any, Any],
        model: Type[EntityT],
    ) -> EntityT:
        """Create an entity from the data stored in a row of the database.

        Args:
            entity_data: Dictionary with the attributes of the entity.
            model: Type of entity object to obtain.

        Returns:
            entity: Built Entity.
        """
        # If we don't copy the data, the all method stop being idempotent.
        entity_data = entity_data.copy()
        try:
            return model.parse_obj(entity_data)
        except ValidationError as error:
            log.error(
                f"Error loading the model {model.__name__} "
                f"for the register {str(entity_data)}"
            )
            raise error

    def _all(self, model: Type[EntityT]) -> List[EntityT]:
        """Get all the entities from the repository that match a model.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
        """
        entities = []

        query = Query().model_type_ == model.__name__.lower()
        entities_data = self.db_.search(query)

        for entity_data in entities_data:
            entities.append(self._build_entity(entity_data, model))

        return entities

    @staticmethod
    def _export_entity(entity: EntityT) -> Dict[Any, Any]:
        """Export the attributes of the entity appending the required by TinyDB.

        Args:
            entity: Entity to export.

        Returns:
            entity_data: Dictionary with the attributes of the entity.
        """
        entity_data = entity.dict()
        entity_data["model_type_"] = entity.model_name.lower()

        return entity_data

    def commit(self) -> None:
        """Persist the changes into the repository."""
        for entity in self.staged["add"]:
            self.db_.upsert(
                self._export_entity(entity),
                (Query().model_type_ == entity.model_name.lower())
                & (Query().id_ == entity.id_),
            )
        self.staged["add"].clear()

        for entity in self.staged["remove"]:
            self.db_.remove(
                (Query().model_type_ == entity.model_name.lower())
                & (Query().id_ == entity.id_)
            )
        self.staged["remove"].clear()

    def _search(
        self,
        fields: Dict[str, EntityID],
        model: Type[EntityT],
    ) -> List[EntityT]:
        """Get the entities whose attributes match one or several conditions.

        Particular implementation of the database adapter.

        Args:
            model: Entity class to obtain.
            fields: Dictionary with the {key}:{value} to search.

        Returns:
            entities: List of Entity object that matches the search criteria.
        """
        entities = []
        try:
            query = self._build_search_query(fields, model)
        except EntityNotFoundError:
            return []

        # Build entities
        entities_data = self.db_.search(query)

        for entity_data in entities_data:
            entities.append(self._build_entity(entity_data, model))

        return entities

    def _build_search_query(
        self,
        fields: Dict[str, EntityID],
        model: Type[EntityT],
    ) -> QueryInstance:
        """Build the Query parts for a repository search.

        If the field type is a list, change the query accordingly.

        Args:
            model: Type of entity object to obtain.
            fields: Dictionary with the {key}:{value} to search.

        Returns:
            Query based on the type of model and fields.
        """
        query_parts = []

        schema = model.schema()["properties"]
        for field, value in fields.items():
            if field not in schema.keys():
                continue

            with suppress(KeyError):
                if schema[field]["type"] == "array":
                    query_parts.append(
                        (Query().model_type_ == model.__name__.lower())
                        & (Query()[field].test(_regexp_in_list, value))
                    )
                    continue

            if isinstance(value, str):
                query_parts.append(
                    (Query().model_type_ == model.__name__.lower())
                    & (Query()[field].search(value, flags=re.IGNORECASE))
                )
            else:
                query_parts.append(
                    (Query().model_type_ == model.__name__.lower())
                    & (Query()[field] == value)
                )
        if len(query_parts) != 0:
            return self._merge_query(query_parts, mode="and")

        raise EntityNotFoundError(
            f"There are no entities of type {model.__name__} in the repository "
            f" that match the search filter {fields}"
        )

    @staticmethod
    def _merge_query(
        query_parts: List[QueryInstance], mode: str = "and"
    ) -> QueryInstance:
        """Join all the query parts into a query.

        Args:
            query_parts: List of queries to concatenate.
            mode: "and" or "or" for the join method.

        Returns:
            A query object that joins all parts.
        """
        query = query_parts[0]

        for query_part in query_parts[1:]:
            if mode == "and":
                query = query & query_part
            else:
                query = query | query_part

        return query

    def apply_migrations(self, migrations_directory: str) -> None:
        """Run the migrations of the repository schema.

        Args:
            migrations_directory: path to the directory containing the migration
                scripts.
        """
        raise NotImplementedError

    def last(
        self,
        model: Type[EntityT],
    ) -> EntityT:
        """Get the biggest entity from the repository.

        Args:
            model: Entity class to obtain.

        Returns:
            entity: Biggest Entity object that matches a model.

        Raises:
            EntityNotFoundError: If there are no entities.
        """
        try:
            last_index_entity = super().last(model)
        except EntityNotFoundError as empty_repo:
            try:
                # Empty repo but entities staged to be commited.
                return max(
                    entity for entity in self.staged["add"] if entity.__class__ == model
                )
            except ValueError as no_staged_entities:
                # Empty repo and no entities staged.
                raise empty_repo from no_staged_entities

        try:
            last_staged_entity = max(
                entity for entity in self.staged["add"] if entity.__class__ == model
            )
        except ValueError:
            # Full repo and no staged entities.
            return last_index_entity

        # Full repo and staged entities.
        return max([last_index_entity, last_staged_entity])

    def close(self) -> None:
        """Close the connection to the database."""
        self.db_.close()

    @property
    def is_closed(self) -> bool:
        """Inform if the connection is closed."""
        try:
            self.db_.tables()
            return False
        except ValueError:
            return True

    def empty(self) -> None:
        """Remove all entities from the repository."""
        self.db_.truncate()

__init__(database_url='')

Initialize the repository attributes.

Parameters:

Name Type Description Default
database_url str

URL specifying the connection to the database.

''
Source code in repository_orm/adapters/data/tinydb.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    database_url: str = "",
) -> None:
    """Initialize the repository attributes.

    Args:
        database_url: URL specifying the connection to the database.
    """
    super().__init__(database_url)
    self.database_file = os.path.expanduser(database_url.replace("tinydb://", ""))
    if not os.path.isfile(self.database_file):
        try:
            with open(self.database_file, "a", encoding="utf-8") as file_cursor:
                file_cursor.close()
        except FileNotFoundError as error:
            raise ConnectionError(
                f"Could not create the database file: {self.database_file}"
            ) from error

    serialization = SerializationMiddleware(JSONStorage)
    serialization.register_serializer(DateTimeSerializer(), "TinyDate")

    self.db_ = TinyDB(
        self.database_file, storage=serialization, sort_keys=True, indent=4
    )
    self.staged: Dict[str, List[Any]] = {"add": [], "remove": []}

apply_migrations(migrations_directory)

Run the migrations of the repository schema.

Parameters:

Name Type Description Default
migrations_directory str

path to the directory containing the migration scripts.

required
Source code in repository_orm/adapters/data/tinydb.py
289
290
291
292
293
294
295
296
def apply_migrations(self, migrations_directory: str) -> None:
    """Run the migrations of the repository schema.

    Args:
        migrations_directory: path to the directory containing the migration
            scripts.
    """
    raise NotImplementedError

close()

Close the connection to the database.

Source code in repository_orm/adapters/data/tinydb.py
336
337
338
def close(self) -> None:
    """Close the connection to the database."""
    self.db_.close()

commit()

Persist the changes into the repository.

Source code in repository_orm/adapters/data/tinydb.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def commit(self) -> None:
    """Persist the changes into the repository."""
    for entity in self.staged["add"]:
        self.db_.upsert(
            self._export_entity(entity),
            (Query().model_type_ == entity.model_name.lower())
            & (Query().id_ == entity.id_),
        )
    self.staged["add"].clear()

    for entity in self.staged["remove"]:
        self.db_.remove(
            (Query().model_type_ == entity.model_name.lower())
            & (Query().id_ == entity.id_)
        )
    self.staged["remove"].clear()

delete(entity)

Delete an entity from the repository.

Parameters:

Name Type Description Default
entity EntityT

Entity to remove from the repository.

required
Source code in repository_orm/adapters/data/tinydb.py
69
70
71
72
73
74
75
76
77
78
79
80
81
def delete(self, entity: EntityT) -> None:
    """Delete an entity from the repository.

    Args:
        entity: Entity to remove from the repository.
    """
    try:
        self.get(entity.id_, type(entity))
    except EntityNotFoundError as error:
        raise EntityNotFoundError(
            f"Unable to delete entity {entity} because it's not in the repository"
        ) from error
    self.staged["remove"].append(entity)

empty()

Remove all entities from the repository.

Source code in repository_orm/adapters/data/tinydb.py
349
350
351
def empty(self) -> None:
    """Remove all entities from the repository."""
    self.db_.truncate()

is_closed() property

Inform if the connection is closed.

Source code in repository_orm/adapters/data/tinydb.py
340
341
342
343
344
345
346
347
@property
def is_closed(self) -> bool:
    """Inform if the connection is closed."""
    try:
        self.db_.tables()
        return False
    except ValueError:
        return True

last(model)

Get the biggest entity from the repository.

Parameters:

Name Type Description Default
model Type[EntityT]

Entity class to obtain.

required

Returns:

Name Type Description
entity EntityT

Biggest Entity object that matches a model.

Raises:

Type Description
EntityNotFoundError

If there are no entities.

Source code in repository_orm/adapters/data/tinydb.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def last(
    self,
    model: Type[EntityT],
) -> EntityT:
    """Get the biggest entity from the repository.

    Args:
        model: Entity class to obtain.

    Returns:
        entity: Biggest Entity object that matches a model.

    Raises:
        EntityNotFoundError: If there are no entities.
    """
    try:
        last_index_entity = super().last(model)
    except EntityNotFoundError as empty_repo:
        try:
            # Empty repo but entities staged to be commited.
            return max(
                entity for entity in self.staged["add"] if entity.__class__ == model
            )
        except ValueError as no_staged_entities:
            # Empty repo and no entities staged.
            raise empty_repo from no_staged_entities

    try:
        last_staged_entity = max(
            entity for entity in self.staged["add"] if entity.__class__ == model
        )
    except ValueError:
        # Full repo and no staged entities.
        return last_index_entity

    # Full repo and staged entities.
    return max([last_index_entity, last_staged_entity])

load_file_repository(url='local:.')

Load the FileRepository object that matches the url protocol.

Parameters:

Name Type Description Default
url str

Url to connect to the storage backend.

'local:.'

Returns:

Type Description
FileRepository[AnyStr]

File Repository that understands the url protocol.

Source code in repository_orm/services.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def load_file_repository(url: str = "local:.") -> "FileRepository[AnyStr]":
    """Load the FileRepository object that matches the url protocol.

    Args:
        url: Url to connect to the storage backend.

    Returns:
        File Repository that understands the url protocol.
    """
    if "local:" in url:
        return LocalFileRepository(workdir=url.split(":")[1])

    raise ValueError(f"File Repository URL: {url} not recognized.")

load_repository(database_url='fake://')

Load the Repository object that matches the database url protocol.

Parameters:

Name Type Description Default
database_url str

Url to connect to the storage backend.

'fake://'

Returns:

Type Description
Repository

Repository that understands the url protocol.

Source code in repository_orm/services.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def load_repository(
    database_url: str = "fake://",
) -> Repository:
    """Load the Repository object that matches the database url protocol.

    Args:
        database_url: Url to connect to the storage backend.

    Returns:
        Repository that understands the url protocol.
    """
    if "fake://" in database_url:
        return FakeRepository("")
    if "sqlite://" in database_url:
        return PypikaRepository(database_url)
    if "tinydb://" in database_url:
        return TinyDBRepository(database_url)

    raise ValueError(f"Database URL: {database_url} not recognized.")

Last update: 2021-04-08