Files
cmg-commands/split_zip_into_under_5GB_chunks.py
2024-12-11 23:40:25 -06:00

73 lines
2.7 KiB
Python
Executable File

#!/Users/shelbybark/code/cmg_split_zip_into_chunks/venv/bin/python3
import os
from zipfile import ZipFile, ZIP_DEFLATED
import io
def get_zip_path():
while True:
zip_path = input("Enter the path to the zipped file: ")
if os.path.isfile(zip_path):
return zip_path
else:
print(f"The file at {zip_path} does not exist. Please try again.")
def split_and_zip(file_path):
with ZipFile(file_path, 'r') as zip_ref:
total_size = 0
for file in zip_ref.namelist():
total_size += zip_ref.getinfo(file).file_size
# Define the maximum group size in bytes
max_group_size = 5 * 1024 ** 3
# Calculate the number of groups
num_groups = -(-total_size // max_group_size) # Ceiling division
# print(f"Total size: { round(total_size / 1024.0 / 1024.0 / 1024.0, 6)} GB")
print(f"Max group size: { max_group_size } GB")
print(f"Total size: { total_size } GB")
print(f"Number of groups: {num_groups}")
# Create a temporary directory for storing intermediate files
tmp_dir = 'temp'
os.makedirs(tmp_dir, exist_ok=True)
# Iterate over each group
for i in range(num_groups):
start_index = i * max_group_size
end_index = min((i + 1) * max_group_size, total_size)
# Extract the files for this group from the zipped file
with ZipFile(file_path, 'r') as zip_file:
group_files = [(zip_file.infolist()[j].filename,
io.BytesIO(),
zip_file.open(zip_file.infolist()[j].filename, 'r'))
for j in range(start_index, end_index)]
# Write each file to a new zip file
with open(os.path.join(tmp_dir, f'group_{i}.zip'), 'wb') as group_zip:
group_zip.write(b'PK' + b'\x01\x0a' * 20)
for filename, buffer, file in group_files:
group_zip.write(f'{filename}\x00')
group_zip.writestr(filename, buffer.read())
del buffer
# Clean up
os.remove(file_path)
# Zip the intermediate files into final zip files
for i in range(num_groups):
with open(os.path.join(tmp_dir, f'group_{i}.zip'), 'rb') as group_zip:
with ZipFile(f'output_group_{i}.zip', 'w', compression=ZIP_DEFLATED) as output_zip:
for file_info in group_zip.infolist():
if file_info.filename.startswith('group_'):
output_zip.writestr(file_info.filename[len('group_'):], group_zip.open(file_info.filename, 'r').read())
# Clean up
os.rmdir(tmp_dir)
zip_file = get_zip_path()
# split_and_zip('input.zip')
split_and_zip(zip_file)