Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
eeroel committed Jul 25, 2023
1 parent 1eab3dc commit 6da464f
Showing 1 changed file with 31 additions and 38 deletions.
69 changes: 31 additions & 38 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl DeltaTable {
let version_start = match get_last_checkpoint(&self.storage).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint, start with version 0
// no checkpoint
-1
}
Err(e) => {
Expand All @@ -429,48 +429,41 @@ impl DeltaTable {

debug!("start with latest checkpoint version: {version_start}");

let max_version = {
// list files to find max version
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap();
}

lazy_static! {
static ref DELTA_LOG_PATH: Path = Path::from("_delta_log");
}

let version = async {
let mut ver: i64 = version_start;
let prefix_path = self.storage.log_path();
let prefix = Some(prefix_path);
let offset_path = commit_uri_from_version(ver);
let mut files = self.storage.list_with_offset(prefix, &offset_path).await?;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: i64 = log_ver_str.parse().unwrap();
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap();
}

// listing may not be ordered
ver = max(ver, log_ver);
lazy_static! {
static ref DELTA_LOG_PATH: Path = Path::from("_delta_log");
}

// also cache timestamp for version, for faster time-travel
self.version_timestamp.insert(log_ver, obj_meta.last_modified.timestamp());
}
}

if ver < 0 {
return Err(DeltaTableError::not_a_table(self.table_uri()));
// list files to find max version
let version = async {
let mut max_version: i64 = version_start;
let prefix = Some(self.storage.log_path());
let offset_path = commit_uri_from_version(max_version);
let mut files = self.storage.list_with_offset(prefix, &offset_path).await?;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) {
let log_version = captures.get(1).unwrap().as_str().parse().unwrap();
// listing may not be ordered
max_version = max(max_version, log_version);
// also cache timestamp for version, for faster time-travel
self.version_timestamp.insert(log_version, obj_meta.last_modified.timestamp());
}
}

if max_version < 0 {
return Err(DeltaTableError::not_a_table(self.table_uri()));
}

Ok::<i64, DeltaTableError>(ver)
};
version.await?
};
Ok::<i64, DeltaTableError>(max_version)
}.await?;

Ok(max_version)
Ok(version)
}

/// Currently loaded version of the table
Expand Down

0 comments on commit 6da464f

Please sign in to comment.