Skip to content

Commit 3c14f04

Browse files
author
ZENOTME
committed
support update for memory catalog
1 parent 1a2a8c8 commit 3c14f04

File tree

2 files changed

+168
-18
lines changed

2 files changed

+168
-18
lines changed

crates/catalog/memory/src/catalog.rs

Lines changed: 124 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,30 @@ impl MemoryCatalog {
5353
warehouse_location,
5454
}
5555
}
56+
57+
fn new_metadata_location(&self, location: &str) -> String {
58+
format!("{}/metadata/{}.metadata.json", location, Uuid::new_v4())
59+
}
60+
61+
async fn commit_table(
62+
&self,
63+
table_ident: &TableIdent,
64+
next_metadata: TableMetadata,
65+
) -> Result<()> {
66+
let mut root_namespace_state = self.root_namespace_state.lock().await;
67+
68+
let table_metadata_dir =
69+
root_namespace_state.get_existing_table_metadata_dir(table_ident)?;
70+
let metadata_location = self.new_metadata_location(table_metadata_dir);
71+
self.file_io
72+
.new_output(&metadata_location)?
73+
.write(serde_json::to_vec(&next_metadata)?.into())
74+
.await?;
75+
76+
root_namespace_state.update_table(table_ident, metadata_location)?;
77+
78+
Ok(())
79+
}
5680
}
5781

5882
#[async_trait]
@@ -197,19 +221,14 @@ impl Catalog for MemoryCatalog {
197221
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
198222
.build()?
199223
.metadata;
200-
let metadata_location = format!(
201-
"{}/metadata/{}-{}.metadata.json",
202-
&location,
203-
0,
204-
Uuid::new_v4()
205-
);
224+
let metadata_location = self.new_metadata_location(&location);
206225

207226
self.file_io
208227
.new_output(&metadata_location)?
209228
.write(serde_json::to_vec(&metadata)?.into())
210229
.await?;
211230

212-
root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
231+
root_namespace_state.insert_new_table(&table_ident, location, metadata_location.clone())?;
213232

214233
Table::builder()
215234
.file_io(self.file_io.clone())
@@ -263,19 +282,39 @@ impl Catalog for MemoryCatalog {
263282
let metadata_location = new_root_namespace_state
264283
.get_existing_table_location(src_table_ident)?
265284
.clone();
285+
let metadata_dir = new_root_namespace_state
286+
.get_existing_table_metadata_dir(src_table_ident)?
287+
.clone();
266288
new_root_namespace_state.remove_existing_table(src_table_ident)?;
267-
new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
289+
new_root_namespace_state.insert_new_table(
290+
dst_table_ident,
291+
metadata_dir,
292+
metadata_location,
293+
)?;
268294
*root_namespace_state = new_root_namespace_state;
269295

270296
Ok(())
271297
}
272298

273299
/// Update a table to the catalog.
274-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
275-
Err(Error::new(
276-
ErrorKind::FeatureUnsupported,
277-
"MemoryCatalog does not currently support updating tables.",
278-
))
300+
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
301+
// Apply the update to get the new metadata.
302+
let table = self.load_table(commit.identifier()).await?;
303+
let requirements = commit.take_requirements();
304+
let updates = commit.take_updates();
305+
let metadata = table.metadata().clone();
306+
for requirement in requirements {
307+
requirement.check(Some(&metadata))?;
308+
}
309+
let mut metadata_builder = metadata.into_builder(None);
310+
for update in updates {
311+
metadata_builder = update.apply(metadata_builder)?;
312+
}
313+
314+
self.commit_table(commit.identifier(), metadata_builder.build()?.metadata)
315+
.await?;
316+
317+
self.load_table(commit.identifier()).await
279318
}
280319
}
281320

@@ -287,6 +326,7 @@ mod tests {
287326

288327
use iceberg::io::FileIOBuilder;
289328
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
329+
use iceberg::transaction::Transaction;
290330
use regex::Regex;
291331
use tempfile::TempDir;
292332

@@ -1035,7 +1075,7 @@ mod tests {
10351075
let table_name = "tbl1";
10361076
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
10371077
let expected_table_metadata_location_regex = format!(
1038-
"^{}/tbl1/metadata/0-{}.metadata.json$",
1078+
"^{}/tbl1/metadata/{}.metadata.json$",
10391079
namespace_location, UUID_REGEX_STR,
10401080
);
10411081

@@ -1088,7 +1128,7 @@ mod tests {
10881128
let expected_table_ident =
10891129
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
10901130
let expected_table_metadata_location_regex = format!(
1091-
"^{}/tbl1/metadata/0-{}.metadata.json$",
1131+
"^{}/tbl1/metadata/{}.metadata.json$",
10921132
nested_namespace_location, UUID_REGEX_STR,
10931133
);
10941134

@@ -1129,7 +1169,7 @@ mod tests {
11291169
let table_name = "tbl1";
11301170
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
11311171
let expected_table_metadata_location_regex = format!(
1132-
"^{}/a/tbl1/metadata/0-{}.metadata.json$",
1172+
"^{}/a/tbl1/metadata/{}.metadata.json$",
11331173
warehouse_location, UUID_REGEX_STR
11341174
);
11351175

@@ -1177,7 +1217,7 @@ mod tests {
11771217
let expected_table_ident =
11781218
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
11791219
let expected_table_metadata_location_regex = format!(
1180-
"^{}/a/b/tbl1/metadata/0-{}.metadata.json$",
1220+
"^{}/a/b/tbl1/metadata/{}.metadata.json$",
11811221
warehouse_location, UUID_REGEX_STR
11821222
);
11831223

@@ -1678,4 +1718,71 @@ mod tests {
16781718
),
16791719
);
16801720
}
1721+
1722+
#[tokio::test]
1723+
async fn test_update_table() {
1724+
let catalog = new_memory_catalog();
1725+
let namespace_ident = NamespaceIdent::new("n1".into());
1726+
create_namespace(&catalog, &namespace_ident).await;
1727+
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1728+
create_table(&catalog, &table_ident).await;
1729+
1730+
let table = catalog.load_table(&table_ident).await.unwrap();
1731+
assert!(table.metadata().properties().is_empty());
1732+
1733+
let transaction = Transaction::new(&table);
1734+
let transaction = transaction
1735+
.set_properties(HashMap::from_iter(vec![("k".to_string(), "v".to_string())]))
1736+
.unwrap();
1737+
transaction.commit(&catalog).await.unwrap();
1738+
1739+
let table = catalog.load_table(&table_ident).await.unwrap();
1740+
assert_eq!(
1741+
table.metadata().properties(),
1742+
&HashMap::from_iter(vec![("k".to_string(), "v".to_string())])
1743+
);
1744+
}
1745+
1746+
#[tokio::test]
1747+
async fn test_update_rename_table() {
1748+
let catalog = new_memory_catalog();
1749+
let namespace_ident = NamespaceIdent::new("n1".into());
1750+
create_namespace(&catalog, &namespace_ident).await;
1751+
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1752+
create_table(&catalog, &table_ident).await;
1753+
1754+
let table = catalog.load_table(&table_ident).await.unwrap();
1755+
assert!(table.metadata().properties().is_empty());
1756+
1757+
let transaction = Transaction::new(&table);
1758+
let transaction = transaction
1759+
.set_properties(HashMap::from_iter(vec![("k".to_string(), "v".to_string())]))
1760+
.unwrap();
1761+
transaction.commit(&catalog).await.unwrap();
1762+
1763+
let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1764+
catalog
1765+
.rename_table(&table_ident, &dst_table_ident)
1766+
.await
1767+
.unwrap();
1768+
1769+
let table = catalog.load_table(&dst_table_ident).await.unwrap();
1770+
let transaction = Transaction::new(&table);
1771+
let transaction = transaction
1772+
.set_properties(HashMap::from_iter(vec![(
1773+
"k1".to_string(),
1774+
"v2".to_string(),
1775+
)]))
1776+
.unwrap();
1777+
transaction.commit(&catalog).await.unwrap();
1778+
1779+
let table = catalog.load_table(&dst_table_ident).await.unwrap();
1780+
assert_eq!(
1781+
table.metadata().properties(),
1782+
&HashMap::from_iter(vec![
1783+
("k".to_string(), "v".to_string()),
1784+
("k1".to_string(), "v2".to_string())
1785+
])
1786+
);
1787+
}
16811788
}

crates/catalog/memory/src/namespace_state.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub(crate) struct NamespaceState {
2929
namespaces: HashMap<String, NamespaceState>,
3030
// Mapping of tables to metadata locations in this namespace
3131
table_metadata_locations: HashMap<String, String>,
32+
// Mapping of tables to metadata dir locations in this namespace
33+
table_metadata_dirs: HashMap<String, String>,
3234
}
3335

3436
fn no_such_namespace_err<T>(namespace_ident: &NamespaceIdent) -> Result<T> {
@@ -175,6 +177,7 @@ impl NamespaceState {
175177
properties,
176178
namespaces: HashMap::new(),
177179
table_metadata_locations: HashMap::new(),
180+
table_metadata_dirs: HashMap::new(),
178181
});
179182

180183
Ok(())
@@ -266,6 +269,7 @@ impl NamespaceState {
266269
pub(crate) fn insert_new_table(
267270
&mut self,
268271
table_ident: &TableIdent,
272+
table_metadata_dir: String,
269273
metadata_location: String,
270274
) -> Result<()> {
271275
let namespace = self.get_mut_namespace(table_ident.namespace())?;
@@ -277,9 +281,45 @@ impl NamespaceState {
277281
hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident),
278282
hash_map::Entry::Vacant(entry) => {
279283
let _ = entry.insert(metadata_location);
284+
let dir = namespace
285+
.table_metadata_dirs
286+
.insert(table_ident.name().to_string(), table_metadata_dir);
287+
// New table should not have a metadata dir.
288+
assert_eq!(dir, None);
289+
Ok(())
290+
}
291+
}
292+
}
293+
294+
pub(crate) fn update_table(
295+
&mut self,
296+
table_ident: &TableIdent,
297+
metadata_location: String,
298+
) -> Result<()> {
299+
let namespace = self.get_mut_namespace(table_ident.namespace())?;
280300

301+
match namespace
302+
.table_metadata_locations
303+
.entry(table_ident.name().to_string())
304+
{
305+
hash_map::Entry::Occupied(mut entry) => {
306+
let _ = entry.insert(metadata_location);
281307
Ok(())
282308
}
309+
hash_map::Entry::Vacant(_) => no_such_table_err(table_ident),
310+
}
311+
}
312+
313+
/// Return the metadata dir of the given table or an error if doesn't exist
314+
pub(crate) fn get_existing_table_metadata_dir(
315+
&self,
316+
table_ident: &TableIdent,
317+
) -> Result<&String> {
318+
let namespace = self.get_namespace(table_ident.namespace())?;
319+
320+
match namespace.table_metadata_dirs.get(table_ident.name()) {
321+
None => no_such_table_err(table_ident),
322+
Some(table_metadata_dir) => Ok(table_metadata_dir),
283323
}
284324
}
285325

@@ -292,7 +332,10 @@ impl NamespaceState {
292332
.remove(table_ident.name())
293333
{
294334
None => no_such_table_err(table_ident),
295-
Some(metadata_location) => Ok(metadata_location),
335+
Some(metadata_location) => {
336+
let _ = namespace.table_metadata_dirs.remove(table_ident.name());
337+
Ok(metadata_location)
338+
}
296339
}
297340
}
298341
}

0 commit comments

Comments
 (0)