Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,6 @@ dmypy.json

# Yarn cache
.yarn/

# untitled notebooks
Untitled.ipynb
3 changes: 2 additions & 1 deletion jupyter_drives/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ async def get(self, drive: str = "", path: str = ""):

@tornado.web.authenticated
async def post(self, drive: str = "", path: str = ""):
result = await self._manager.new_file(drive, path)
body = self.get_json_body()
result = await self._manager.new_file(drive, path, **body)
self.finish(result)

@tornado.web.authenticated
Expand Down
238 changes: 114 additions & 124 deletions jupyter_drives/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from typing import Dict, List, Optional, Tuple, Union, Any
from datetime import timedelta
from datetime import timedelta, datetime

import os
import tornado
Expand All @@ -17,6 +17,8 @@
from libcloud.storage.providers import get_driver
import pyarrow
from aiobotocore.session import get_session
import fsspec
import s3fs

from .log import get_logger
from .base import DrivesConfig
Expand All @@ -41,12 +43,17 @@ def __init__(self, config: traitlets.config.Config) -> None:
self._client = httpx.AsyncClient()
self._content_managers = {}
self._max_files_listed = 1000

# instate fsspec file system
self._file_system = fsspec.filesystem(self._config.provider, asynchronous=True)

# initiate aiobotocore session if we are dealing with S3 drives
if self._config.provider == 's3':
if self._config.access_key_id and self._config.secret_access_key:
self._fixDir_suffix = '/.jupyter-drives-fixDir' # fix for creating directories
self._s3_clients = {}
self._s3_session = get_session()
self._file_system = s3fs.S3FileSystem(anon=False, asynchronous=True, key=self._config.access_key_id, secret=self._config.secret_access_key, token=self._config.session_token)
else:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
Expand Down Expand Up @@ -216,56 +223,47 @@ async def get_contents(self, drive_name, path):

try :
data = []
isDir = False
emptyDir = True # assume we are dealing with an empty directory

chunk_size = 100
if self._max_files_listed < chunk_size:
chunk_size = self._max_files_listed
no_batches = int(self._max_files_listed/chunk_size)

# using Arrow lists as they are recommended for large results
# stream will be an async iterable of RecordBatch
current_batch = 0
stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=chunk_size, return_arrow=True)
async for batch in stream:
current_batch += 1
# reached last batch that can be shown (partially)
if current_batch == no_batches + 1:
remaining_files = self._max_files_listed - no_batches*chunk_size

# if content exists we are dealing with a directory
if isDir is False and batch:
isDir = True
emptyDir = False
is_dir = await self._file_system._isdir(drive_name + '/' + path)

if is_dir == True:
chunk_size = 100
if self._max_files_listed < chunk_size:
chunk_size = self._max_files_listed
no_batches = int(self._max_files_listed/chunk_size)

# using Arrow lists as they are recommended for large results
# stream will be an async iterable of RecordBatch
current_batch = 0
stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=chunk_size, return_arrow=True)
async for batch in stream:
current_batch += 1
# reached last batch that can be shown (partially)
if current_batch == no_batches + 1:
remaining_files = self._max_files_listed - no_batches*chunk_size

contents_list = pyarrow.record_batch(batch).to_pylist()
for object in contents_list:
# when listing the last batch (partially), make sure we don't exceed limit
if current_batch == no_batches + 1:
if remaining_files <= 0:
break
remaining_files -= 1
data.append({
"path": object["path"],
"last_modified": object["last_modified"].isoformat(),
"size": object["size"],
})

contents_list = pyarrow.record_batch(batch).to_pylist()
for object in contents_list:
# when listing the last batch (partially), make sure we don't exceed limit
# check if we reached the limit of files that can be listed
if current_batch == no_batches + 1:
if remaining_files <= 0:
break
remaining_files -= 1
data.append({
"path": object["path"],
"last_modified": object["last_modified"].isoformat(),
"size": object["size"],
})

# check if we reached the limit of files that can be listed
if current_batch == no_batches + 1:
break
break

# check if we are dealing with an empty drive
if isDir is False and path != '':
else:
content = b""
# retrieve contents of object
obj = await obs.get_async(self._content_managers[drive_name]["store"], path)
stream = obj.stream(min_chunk_size=5 * 1024 * 1024) # 5MB sized chunks
async for buf in stream:
# if content exists we are dealing with a file
if emptyDir is True and buf:
emptyDir = False
content += buf

# retrieve metadata of object
Expand All @@ -286,18 +284,6 @@ async def get_contents(self, drive_name, path):
"size": metadata["size"]
}

# dealing with the case of an empty directory, making sure it is not an empty file
if emptyDir is True:
ext_list = ['.R', '.bmp', '.csv', '.gif', '.html', '.ipynb', '.jl', '.jpeg', '.jpg', '.json', '.jsonl', '.md', '.ndjson', '.pdf', '.png', '.py', '.svg', '.tif', '.tiff', '.tsv', '.txt', '.webp', '.yaml', '.yml']
object_name = os.path.basename(path)
# if object doesn't contain . or doesn't end in one of the registered extensions
if object_name.find('.') == -1 or ext_list.count(os.path.splitext(object_name)[1]) == 0:
data = []

# remove upper logic once directories are fixed
check = self._check_object(drive_name, path)
print(check)

response = {
"data": data
}
Expand All @@ -309,27 +295,33 @@ async def get_contents(self, drive_name, path):

return response

async def new_file(self, drive_name, path):
async def new_file(self, drive_name, path, type):
"""Create a new file or directory at the given path.

Args:
drive_name: name of drive where the new content is created
path: path where new content should be created
type: whether we are dealing with a file or a directory
"""
data = {}
try:
# eliminate leading and trailing backslashes
path = path.strip('/')

# TO DO: switch to mode "created", which is not implemented yet
await obs.put_async(self._content_managers[drive_name]["store"], path, b"", mode = "overwrite")
metadata = await obs.head_async(self._content_managers[drive_name]["store"], path)
object_name = drive_name + '/' + path
# in the case of S3 directories, we need to add a suffix to feign the creation of a directory
if type == 'directory' and self._config.provider == 's3':
object_name = object_name + self._fixDir_suffix

await self._file_system._touch(object_name)
metadata = await self._file_system._info(object_name)

data = {
"path": path,
"content": "",
"last_modified": metadata["last_modified"].isoformat(),
"size": metadata["size"]
"last_modified": metadata["LastModified"].isoformat(),
"size": metadata["size"],
"type": metadata["type"]
}
except Exception as e:
raise tornado.web.HTTPError(
Expand Down Expand Up @@ -371,23 +363,22 @@ async def save_file(self, drive_name, path, content, options_format, content_for
byte_array = bytearray(slice_)
byte_arrays.append(byte_array)

# combine byte arrays and wrap in a BytesIO object
formatted_content = BytesIO(b"".join(byte_arrays))
formatted_content.seek(0) # reset cursor for any further reading
# combine byte arrays
formatted_content = b"".join(byte_arrays)
elif options_format == 'text':
formatted_content = content.encode("utf-8")
else:
formatted_content = content
if formatted_content is None or formatted_content == '':
formatted_content = b''

await obs.put_async(self._content_managers[drive_name]["store"], path, formatted_content, mode = "overwrite")
metadata = await obs.head_async(self._content_managers[drive_name]["store"], path)
await self._file_system._pipe(drive_name + '/' + path, formatted_content)
metadata = await self._file_system._info(drive_name + '/' + path)

data = {
"path": path,
"content": content,
"last_modified": metadata["last_modified"].isoformat(),
"last_modified": metadata["LastModified"].isoformat(),
"size": metadata["size"]
}
except Exception as e:
Expand All @@ -414,12 +405,20 @@ async def rename_file(self, drive_name, path, new_path):
# eliminate leading and trailing backslashes
path = path.strip('/')

await obs.rename_async(self._content_managers[drive_name]["store"], path, new_path)
metadata = await obs.head_async(self._content_managers[drive_name]["store"], new_path)
object_name = drive_name + '/' + path
new_object_name = drive_name + '/' + new_path
is_dir = await self._file_system._isdir(object_name)
if is_dir == True:
object_name = object_name + self._fixDir_suffix
new_object_name = new_object_name + self._fixDir_suffix
await self._fix_dir(drive_name, path)

await self._file_system._mv_file(object_name, new_object_name)
metadata = await self._file_system._info(new_object_name)

data = {
"path": new_path,
"last_modified": metadata["last_modified"].isoformat(),
"last_modified": metadata["LastModified"].isoformat(),
"size": metadata["size"]
}
except Exception as e:
Expand All @@ -443,7 +442,17 @@ async def delete_file(self, drive_name, path):
try:
# eliminate leading and trailing backslashes
path = path.strip('/')
await obs.delete_async(self._content_managers[drive_name]["store"], path)
is_dir = await self._file_system._isdir(drive_name + '/' + path)
if is_dir == True:
await self._fix_dir(drive_name, path)
await self._file_system._rm(drive_name + '/' + path, recursive = True)

# checking for remaining directories and deleting them
stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=100, return_arrow=True)
async for batch in stream:
contents_list = pyarrow.record_batch(batch).to_pylist()
for object in contents_list:
await self._fix_dir(drive_name, object["path"], delete_only = True)

except Exception as e:
raise tornado.web.HTTPError(
Expand All @@ -453,25 +462,6 @@ async def delete_file(self, drive_name, path):

return

async def check_file(self, drive_name, path):
"""Check if an object already exists within a drive.

Args:
drive_name: name of drive where object exists
path: path where content is located
"""
try:
# eliminate leading and trailing backslashes
path = path.strip('/')
await obs.head_async(self._content_managers[drive_name]["store"], path)
except Exception:
raise tornado.web.HTTPError(
status_code= httpx.codes.NOT_FOUND,
reason="Object does not already exist within drive.",
)

return

async def copy_file(self, drive_name, path, to_path, to_drive):
"""Save file with new content.

Expand All @@ -486,29 +476,26 @@ async def copy_file(self, drive_name, path, to_path, to_drive):
# eliminate leading and trailing backslashes
path = path.strip('/')

# copy object within same drive
object_name = drive_name + '/' + path
# copy objects within same drive
if to_drive == drive_name:
await obs.copy_async(self._content_managers[drive_name]["store"], path, to_path)
metadata = await obs.head_async(self._content_managers[drive_name]["store"], to_path)
# copy object to another drive
to_object_name = drive_name + '/' + to_path
# copy objects to another drive
else:
content = b''
try:
# retrieving contents of file
file = await obs.get_async(self._content_managers[drive_name]["store"], path)
stream = file.stream(min_chunk_size=5 * 1024 * 1024) # 5MB sized chunks
async for buf in stream:
content += buf
except:
# dealing with a directory, no contents to retrieve
pass

await obs.put_async(self._content_managers[to_drive]["store"], to_path, content)
metadata = await obs.head_async(self._content_managers[to_drive]["store"], to_path)
to_object_name = to_drive + '/' + to_path

is_dir = await self._file_system._isdir(object_name)
if is_dir == True:
object_name = object_name + self._fixDir_suffix
to_object_name = to_object_name + self._fixDir_suffix
await self._fix_dir(drive_name, path)

await self._file_system._copy(object_name, to_object_name)
metadata = await self._file_system._info(to_object_name)

data = {
"path": to_path,
"last_modified": metadata["last_modified"].isoformat(),
"last_modified": metadata["LastModified"].isoformat(),
"size": metadata["size"]
}
except Exception as e:
Expand Down Expand Up @@ -572,29 +559,32 @@ async def _get_drive_location(self, drive_name):

return location

def _check_object(self, drive_name, path):
"""Helping function to check if we are dealing with an empty file or directory.

async def _fix_dir(self, drive_name, path, delete_only = False):
"""Helping function to fix a directory. It applies to the S3 folders created in the AWS console.
Args:
drive_name: name of drive where object exists
path: path to object to check
path: path of object to fix
"""
isDir = False
try:
location = self._content_managers[drive_name]["location"]
if location not in self._s3_clients:
self._s3_clients[location] = self._s3_session.client('s3', location)

listing = self._s3_clients[location].list_objects_v2(Bucket = drive_name, Prefix = path + '/')
if 'Contents' in listing:
isDir = True
try:
check = await self._file_system._exists(drive_name + '/' + path + self._fixDir_suffix)
if check == True: # directory has right format
return
else: # directory was created from console
# delete original object
async with self._s3_session.create_client('s3', aws_secret_access_key=self._config.secret_access_key, aws_access_key_id=self._config.access_key_id, aws_session_token=self._config.session_token) as client:
await client.delete_object(Bucket=drive_name, Key=path+'/')
if delete_only == True:
return
# create new directory
await self._file_system._touch(drive_name + '/' + path + self._fixDir_suffix)
except Exception as e:
raise tornado.web.HTTPError(
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
reason=f"The following error occured when retriving the drive location: {e}",
reason=f"The following error occured when fixing the directory object: {e}",
)

return isDir
return

async def _call_provider(
self,
Expand Down
Loading
Loading