Skip to content

FileSystems#

FileSystemAwsS3 #

Manage files on aws s3.

Parameters:

Name Type Description Default
source_bucket str

Name of bucket that contains the original files.

required
target_bucket str

Name of bucket that should contain the new files. If bucket does not already exist an error will be raised, it will not be created automatically. Defaults to be the same as source_bucket.

None
source_dir str

Directory path from where to start looking for files. If not provided defaults to source_bucket root.

None
target_dir str

Directory path to where should the new files be written. If not provided defaults to target_bucket root.

None
recursive bool

Decides whether to look for files recursively. Defaults to True.

False
aws_access_key_id str

The access key to use when creating the s3 resource. If not provided, the credentials configured for the session will be used.

None
aws_secret_access_key str

The secret key to use when creating the s3 resource. Same semantics as aws_access_key_id above.

None
region_name str

The name of the region associated with the resource. If not provided, it will be obtained from the default boto3 session.

None
Source code in renameit/file_systems/aws.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class FileSystemAwsS3(IFileSystem):
    """Manage files on aws s3.

    Args:
        source_bucket (str): Name of bucket that contains the original files.
        target_bucket (str, optional): Name of bucket that should contain the new files.
            If bucket does not already exist an error will be raised, it will not
            be created automatically. Defaults to be the same as source_bucket.
        source_dir (str, optional): Directory path from where to start looking for files.
            If not provided defaults to source_bucket root.
        target_dir (str, optional): Directory path to where should the new files be written.
            If not provided defaults to target_bucket root.
        recursive (bool, optional): Decides whether to look for files recursively.
            Defaults to True.
        aws_access_key_id (str, optional): The access key to use when creating
            the s3 resource. If not provided, the credentials configured for the session
            will be used.
        aws_secret_access_key (str, optional): The secret key to use when creating
            the s3 resource. Same semantics as aws_access_key_id above.
        region_name (str, optional):  The name of the region associated with the resource.
            If not provided, it will be obtained from the default boto3 session.
    """

    name = "aws_s3"

    def __init__(
        self,
        source_bucket: str,
        target_bucket: str = None,
        source_dir: str = None,
        target_dir: str = None,
        recursive: bool = False,
        aws_access_key_id: str = None,
        aws_secret_access_key: str = None,
        region_name: str = None,
    ):
        super().__init__()

        self.source_bucket = source_bucket
        self.target_bucket = target_bucket or source_bucket
        self.source_dir = source_dir
        self.target_dir = target_dir
        self.recursive = recursive
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key
        self.region_name = region_name or boto3.session.Session().region_name

        self.s3 = boto3.resource(
            "s3",
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            region_name=self.region_name,
        )

        self.s3_client = self.s3.meta.client

        # TODO try to create it with the same configs as source_bucket
        # or using some user input.
        if self.s3.Bucket(self.target_bucket).creation_date is None:
            raise TargetContainerNotExistError(
                f"Target bucket `{self.target_bucket}` does not exist "
                + "or you do not have enough permissions to access it."
            )

    @property
    def source_dir(self):
        return self._source_dir

    @source_dir.setter
    def source_dir(self, value):
        self._source_dir = add_slash(value)

    @property
    def target_dir(self):
        return self._target_dir

    @target_dir.setter
    def target_dir(self, value):
        self._target_dir = add_slash(value)

    def list_objects(self) -> Iterable[FileObject]:
        continuation_token = None
        kwargs = {
            "Bucket": self.source_bucket,
            "Prefix": self.source_dir,
            "StartAfter": self.source_dir,
        }

        # This will group all subdirectories in the response's CommonPrefixes
        # thus, their files will not be included in the reponse's Content.
        if not self.recursive:
            kwargs["Delimiter"] = "/"

        while True:

            if continuation_token:
                kwargs["ContinuationToken"] = continuation_token

            response = self.s3_client.list_objects_v2(**kwargs)

            for obj in response["Contents"]:
                yield FileObject(
                    path=obj["Key"].removeprefix(self.source_dir),
                    modified_date=obj["LastModified"],
                )

            if response["IsTruncated"]:
                continuation_token = response["NextContinuationToken"]
            else:
                break

    def copy_object(self, source_obj: FileObject, target_obj: FileObject) -> None:
        print(self.source_dir + source_obj.path, "->", self.target_dir + target_obj.path)

        copy_source = {"Bucket": self.source_bucket, "Key": self.source_dir + source_obj.path}
        s3_target_bucket = self.s3.Bucket(self.target_bucket)
        s3_target_bucket.copy(copy_source, self.target_dir + target_obj.path)

FileSystemAzureDatalakeStorage #

Manage files on azure datalake storage gen2.

Supports multiple authentication methods
  • Storage account name and access keys
  • Storage account connection strings
  • Service principal with RBAC assignments

Parameters:

Name Type Description Default
source_container str

Name of storage account container with the original files.

required
target_container str

Name of storage account container that should contain new files. Defaults to be the same as source_container and it will be created automatically if it doesn't exist.

None
source_dir str

Directory path from where to start looking for files. If not provided defaults to source_container root.

None
target_dir str

Directory path to where should the new files be written. If not provided defaults to target_container root.

None
secure_transfer bool

Whether to use https or not. Uses https by default.

True
recursive bool

Whether Decides whether to look for files recursively. Defaults to True.

True
connection_string str

The storage account connection string.

None
storage_account_name str

Storage account name.

None
storage_account_key str

Storage account access key.

None
tenant_id str

Directory (tenant) ID of the service principal (app).

None
client_id str

Application (client) ID of the service principal (app).

None
client_secret str

The app password that it uses to authenticate itself.

None
account_url str

Provided if the storage account was behind a custom domain name. Defaults to ://.dfs.core.windows.net.

None
Source code in renameit/file_systems/azure.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class FileSystemAzureDatalakeStorage(IFileSystem):
    """Manage files on azure datalake storage gen2.

    Supports multiple authentication methods:

      - Storage account name and access keys
      - Storage account connection strings
      - Service principal with RBAC assignments

    Args:
        source_container (str): Name of storage account container with the original files.
        target_container (str): Name of storage account container that should contain new files.
            Defaults to be the same as source_container and it will be created automatically if it
            doesn't exist.
        source_dir (str, optional): Directory path from where to start looking for files.
            If not provided defaults to source_container root.
        target_dir (str, optional): Directory path to where should the new files be written.
            If not provided defaults to target_container root.
        secure_transfer (bool, optional): Whether to use https or not. Uses https by default.
        recursive (bool, optional): Whether Decides whether to look for files recursively.
            Defaults to True.
        connection_string (str, optional): The storage account connection string.
        storage_account_name (str, optional): Storage account name.
        storage_account_key (str, optional): Storage account access key.
        tenant_id (str, optional): Directory (tenant) ID of the service principal (app).
        client_id (str, optional): Application (client) ID of the service principal (app).
        client_secret (str, optional): The app password that it uses to authenticate itself.
        account_url (str, optional): Provided if the storage account was behind a custom
            domain name. Defaults to <protocol>://<storage_account_name>.dfs.core.windows.net.
    """

    name = "azure_datalake_storage"

    def __init__(
        self,
        source_container: str,
        target_container: str = None,
        source_dir: str = None,
        target_dir: str = None,
        secure_transfer: bool = True,
        recursive: bool = True,
        connection_string: str = None,
        storage_account_name: str = None,
        storage_account_key: str = None,
        tenant_id: str = None,
        client_id: str = None,
        client_secret: str = None,
        account_url: str = None,
    ):

        super().__init__()

        self.source_container = source_container
        self.target_container = target_container or source_container
        self.source_dir = source_dir
        self.target_dir = target_dir
        self.protocol = "https" if secure_transfer else "http"
        self.recursive = recursive

        self.connection_string = connection_string
        self.storage_account_name = storage_account_name
        self.storage_account_key = storage_account_key
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret

        self.account_url = (
            account_url or f"{self.protocol}://{self.storage_account_name}.dfs.core.windows.net"
        )

        self.service_client = self._get_service_client()
        self.source_file_system_client = self.service_client.get_file_system_client(
            file_system=self.source_container
        )
        self.target_file_system_client = self.service_client.get_file_system_client(
            file_system=self.target_container
        )

        if not self.target_file_system_client.exists():
            self.target_file_system_client.create_file_system()

    def _get_service_client(self) -> DataLakeServiceClient:
        if self.connection_string:
            return DataLakeServiceClient.from_connection_string(self.connection_string)
        elif self.storage_account_name:

            credential: Union[ClientSecretCredential, str]

            if self.storage_account_key:
                credential = self.storage_account_key
            elif self.tenant_id and self.client_id and self.client_secret:
                credential = ClientSecretCredential(
                    tenant_id=self.tenant_id,
                    client_id=self.client_id,
                    client_secret=self.client_secret,
                )
            else:
                raise ValueError(
                    "Not enough credentials provided in order to connect to azure storage service."
                )

            return DataLakeServiceClient(account_url=self.account_url, credential=credential)

        else:
            raise ValueError("Either 'connection_string' or 'storage_account_name' is required.")

    def list_objects(self) -> Iterable[FileObject]:
        paths = self.source_file_system_client.get_paths(
            path=self.source_dir, recursive=self.recursive
        )

        for path in paths:
            if not path.is_directory:
                yield FileObject(
                    path=path.name.removeprefix(add_slash(self.source_dir)),
                    modified_date=path.last_modified,
                )

    def rename_object(self, source_obj: FileObject, target_obj: FileObject) -> None:
        logging.info(
            "Renaming: "
            + f"{self.account_url}/{self.source_container}/"
            + f"{add_slash(self.source_dir)}{source_obj.path}"
            + " to "
            + f"{self.account_url}/{self.target_container}/"
            + f"{add_slash(self.target_dir)}{target_obj.path}"
        )

        full_target_path = add_slash(self.target_dir) + target_obj.path

        self.target_file_system_client.create_file(full_target_path)

        self.source_file_system_client.get_file_client(
            file_path=add_slash(self.source_dir) + source_obj.path
        ).rename_file(new_name=self.target_container + "/" + full_target_path)

FileSystemGoogleCloudStorage #

Manage files on google cloud storage.

Authenticating to google cloud storage uses service account credentials and looks for them in the following ordered places:

  • Checks service_account_json argument
  • Checks service_account_info argument
  • Environment variable called GOOGLE_APPLICATION_CREDENTIALS that points to json file that contains the credentials

Parameters:

Name Type Description Default
source_bucket str

Name of bucket that contains the original files.

required
target_bucket str

Name of bucket that should contain the new files. If bucket does not already exist an error will be raised, it will not be created automatically. Defaults to be the same as source_bucket.

required
source_dir str

Directory path from where to start looking for files. If not provided defaults to source_bucket root.

None
target_dir str

Directory path to where should the new files be written. If not provided defaults to target_bucket root.

None
recursive bool

Decides whether to look for files recursively. Defaults to True.

False
service_account_json str

File path to json file that contains service account credentials.

None
service_account_info Union[Dict[str, str], str]

Service account credentials as dict or a string representing a json object.

None
Source code in renameit/file_systems/google.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class FileSystemGoogleCloudStorage(IFileSystem):
    """Manage files on google cloud storage.

    Authenticating to google cloud storage uses service account credentials
    and looks for them in the following ordered places:

    - Checks `service_account_json` argument
    - Checks `service_account_info` argument
    - Environment variable called `GOOGLE_APPLICATION_CREDENTIALS`
        that points to json file that contains the credentials

    Args:
        source_bucket (str): Name of bucket that contains the original files.
        target_bucket (str): Name of bucket that should contain the new files.
            If bucket does not already exist an error will be raised, it will not
            be created automatically. Defaults to be the same as source_bucket.
        source_dir (str, optional): Directory path from where to start looking for files.
            If not provided defaults to source_bucket root.
        target_dir (str, optional): Directory path to where should the new files be written.
            If not provided defaults to target_bucket root.
        recursive (bool, optional): Decides whether to look for files recursively.
            Defaults to True.
        service_account_json (str, optional): File path to json file that contains service
            account credentials.
        service_account_info (Union[Dict[str, str], str], optional): Service account
            credentials as dict or a string representing a json object.
    """

    name = "google_cloud_storage"

    def __init__(
        self,
        source_bucket: str,
        target_bucket: str,
        source_dir: str = None,
        target_dir: str = None,
        recursive: bool = False,
        service_account_json: str = None,
        service_account_info: Union[Dict[str, str], str] = None,
    ):
        super().__init__()
        self.source_bucket = source_bucket
        self.target_bucket = target_bucket or source_bucket
        self.source_dir = source_dir
        self.target_dir = target_dir
        self.recursive = recursive
        self.service_account_json = service_account_json
        self.service_account_info = (
            json.loads(service_account_info)
            if type(service_account_info) is str
            else service_account_info
        )

        if self.service_account_json:
            self.storage_client = storage.Client.from_service_account_json(
                self.service_account_json
            )

        elif self.service_account_info:
            self.storage_client = self.storage_client = storage.Client.from_service_account_info(
                self.service_account_info
            )

        else:
            self.storage_client = storage.Client()

        self.source_bucket_client = self.storage_client.bucket(self.source_bucket)
        self.target_bucket_client = self.storage_client.bucket(self.target_bucket)

        # TODO try to create it with the same configs as source_bucket
        # or using some user input.
        if not self.target_bucket_client.exists():
            raise TargetContainerNotExistError(
                f"Target bucket `{self.target_bucket}` does not exist "
                + "or you do not have enough permissions to access it."
            )

    @property
    def source_dir(self):
        return self._source_dir

    @source_dir.setter
    def source_dir(self, value):
        self._source_dir = add_slash(value)

    @property
    def target_dir(self):
        return self._target_dir

    @target_dir.setter
    def target_dir(self, value):
        self._target_dir = add_slash(value)

    def list_objects(self) -> Iterable[FileObject]:
        if self.recursive:
            delimiter = None
        else:
            delimiter = "/"

        blobs = self.storage_client.list_blobs(
            self.source_bucket, prefix=self.source_dir, delimiter=delimiter
        )

        for blob in blobs:
            if not blob.name.endswith("/"):
                yield FileObject(
                    path=blob.name.removeprefix(self.source_dir), modified_date=blob.updated
                )

    def copy_object(self, source_obj: FileObject, target_obj: FileObject) -> None:
        source_blob = self.source_bucket_client.blob(self.source_dir + source_obj.path)
        target_blob = self.source_bucket_client.copy_blob(
            source_blob, self.target_bucket_client, self.target_dir + target_obj.path
        )
        logging.info(
            "Copied: "
            + f"blob {source_blob.name} in bucket {self.source_bucket}"
            + f"to blob {target_blob.name} in bucket {self.target_bucket}"
        )

    def rename_object(self, source_obj: FileObject, target_obj: FileObject) -> None:
        if self.source_bucket == self.target_bucket:
            self._rename_within_same_bucket(source_obj, target_obj)
        else:
            self._rename_across_different_buckets(source_obj, target_obj)

        logging.info(
            "Renamed: "
            + f"blob {self.source_dir + source_obj.path} in bucket {self.source_bucket}"
            + f"to blob {self.target_dir + target_obj.path} in bucket {self.target_bucket}"
        )

    def _rename_within_same_bucket(self, source_obj: FileObject, target_obj: FileObject) -> None:
        source_blob = self.source_bucket_client.blob(self.source_dir + source_obj.path)
        self.source_bucket_client.rename_blob(source_blob, self.target_dir + target_obj.path)

    def _rename_across_different_buckets(
        self, source_obj: FileObject, target_obj: FileObject
    ) -> None:
        self.copy_object(source_obj, target_obj)
        self.source_bucket_client.delete_blob(self.source_dir + source_obj.path)

FileSystemLocal #

Manage files on the local operating system.

Parameters:

Name Type Description Default
source_dir str

Source directory to iterate files from.

required
target_dir str

Target directory to copy/rename files to.

required
recursive bool

Decides if listing source_dir should be recursive or not.

False
Source code in renameit/file_systems/local.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class FileSystemLocal(IFileSystem):
    """Manage files on the local operating system.

    Args:
        source_dir (str): Source directory to iterate files from.
        target_dir (str): Target directory to copy/rename files to.
        recursive (bool, optional): Decides if listing source_dir should be recursive or not.
    """

    name = "local"

    def __init__(self, source_dir: str, target_dir: str, recursive: bool = False) -> None:

        super().__init__()

        self.source_dir = pathlib.Path(source_dir)
        self.target_dir = pathlib.Path(target_dir)
        self.recursive = recursive

    def list_objects(self) -> Iterable[FileObject]:
        if self.recursive:
            glob_method = self.source_dir.rglob
        else:
            glob_method = self.source_dir.glob

        for path in glob_method("*"):
            if path.is_file():
                yield FileObject(
                    path=str(path.relative_to(self.source_dir)),
                    modified_date=datetime.fromtimestamp((path.stat().st_mtime)),
                )

    @_create_target_dirs
    def rename_object(self, source_obj: FileObject, target_obj: FileObject) -> None:
        logging.info(
            "Renaming: "
            + str(self.source_dir / source_obj.path)
            + " to "
            + str(self.target_dir / target_obj.path)
        )

        (self.source_dir / source_obj.path).rename(self.target_dir / target_obj.path)

    @_create_target_dirs
    def copy_object(self, source_obj: FileObject, target_obj: FileObject) -> None:
        logging.info(
            "Copying: "
            + str(self.source_dir / source_obj.path)
            + " to "
            + str(self.target_dir / target_obj.path)
        )

        shutil.copy(self.source_dir / source_obj.path, self.target_dir / target_obj.path)