9f457963d0544b6c9bcf87b349d.../main.py
2024-11-27 18:09:23 +00:00

276 lines
9.1 KiB
Python

import os
import uuid
import uvloop
from fathom_services.client import FathomClient
from fathom_services.client.auth import FathomAuth
from fathom_services.client.config import BASE_URL_DEFAULT, ServiceAPIs
from fathom_services.client.config import ClientConfig
from fathom_services.client.config import ServicePaths
from fathom_services.client.models import SequenceAPIResponse
from fathom_services.client.models import SequenceColumnsAndRows
from fathom_services.client.models._base import PageQuery
FATHOM_TOKEN = os.environ.get("FATHOM_TOKEN", "")
TEST_AUTH = FathomAuth()
TEST_AUTH.set_token(token=FATHOM_TOKEN)
async def demo_sequences():
full_path_mappings = {
ServiceAPIs.Sequences: "http://sequence-service.dev:8000",
}
service_paths = ServicePaths(base_url=BASE_URL_DEFAULT, full_path_mappings=full_path_mappings)
async with FathomClient(
client_config=ClientConfig(debug=True),
organization_id="3fa85f64-5717-4562-b3fc-2c963f66afb6",
project_id="3fa85f64-5717-4562-b3fc-2c963f66afa7",
auth=TEST_AUTH,
service_paths=service_paths,
) as fc:
print(fc)
data_set_id = str(uuid.uuid4())
# ## Metadata
# Create a sequence metadata obj
response = await fc.sequences.create(
data_set_id=data_set_id,
name="string",
columns=[
{
"name": "string1",
"description": "some string",
"valueType": "STRING",
"nullable": True,
},
{
"name": "long1",
"description": "some long",
"valueType": "LONG",
"nullable": True,
},
{
"name": "double1",
"description": "some double",
"valueType": "DOUBLE",
"nullable": True,
},
{
"name": "date1",
"description": "some date",
"valueType": "DATE",
"nullable": True,
},
{
"name": "boolean1",
"description": "some boolean",
"valueType": "BOOLEAN",
"nullable": True,
},
{
"name": "geo1",
"description": "some geo",
"valueType": "GEO",
"nullable": True,
},
{
"name": "uuid1",
"description": "some uuid",
"valueType": "UUID",
"nullable": True,
},
],
client_id=data_set_id,
description="string",
)
print(type(response))
print(response)
sequence_id = response.id
# check existence of a sequence that does exist
response = await fc.sequences.fetch(sequence_id)
print(response)
assert bool(response) is True
# Confirm sequence created
# Get all sequences
response = await fc.sequences.list()
print(type(response))
print(response)
# Check paging
response = await fc.sequences.list(
page_query=PageQuery(page_number=0, page_size=1)
)
print(type(response))
print(response)
print(len(response.content))
assert len(response.content) == 1
# Get single sequence
response = await fc.sequences.retrieve(sequence_id)
print(response)
update_description = "updated description"
update_name = "updated name"
update_data_set_id = str(uuid.uuid4())
# Update sequence
await fc.sequences.update(
id=sequence_id,
data_set_id=update_data_set_id,
name=update_name,
description=update_description,
)
# Confirm sequence updated
response = await fc.sequences.retrieve(sequence_id)
print(response)
assert response.name == update_name
assert response.description == update_description
assert response.data_set_id == update_data_set_id
# ## Data
# retrieve empty data
res = await fc.sequences.data.retrieve(id=sequence_id)
print(res)
assert len(res) == 0
# insert data
create_row_ids = ["row1", "row2", "row3"]
create_row_ids_len = len(create_row_ids)
seq_data_insert = SequenceColumnsAndRows(
columns=[
"boolean1",
"date1",
"double1",
"geo1",
"long1",
"string1",
"uuid1",
],
rows=[
{
"id": create_row_ids[0],
"values": [
True,
"2023-12-08T13:53:59.609899Z",
4.294967297,
{"type": "Point", "coordinates": [11.2, -64.1]},
4294967297,
"string_value1",
"3fa85f64-5717-4562-b3fc-2c963f66afa6",
],
},
{
"id": create_row_ids[1],
"values": [
False,
"2023-12-08T13:53:59.619899Z",
-4.294967297,
{"type": "Point", "coordinates": [-2.11, 1.64]},
-4294967297,
"string_value2",
"3fa85f64-5717-4562-b3fc-2c963f66afa7",
],
},
],
)
seq_data_insert_len = len(seq_data_insert.rows)
res = await fc.sequences.data.insert(id=sequence_id, rows=seq_data_insert)
print(type(res))
print(res)
assert res == SequenceAPIResponse(count=seq_data_insert_len, errors=None)
res = await fc.sequences.data.retrieve(id=sequence_id)
print(type(res))
print(res)
assert len(res) == seq_data_insert_len
# load data as pandas dataframe
res_df = res.to_pandas(res.columns)
print(type(res_df))
print(res_df)
print(res_df.dtypes)
# update data
seq_data_update = SequenceColumnsAndRows(
columns=["geo1", "long1", "string1"],
rows=[
{
"id": create_row_ids[0],
"values": [
{"type": "Point", "coordinates": [11.11, 64.64]},
13,
"string_value3u",
],
}
],
)
seq_data_update_len = len(seq_data_update.rows)
res = await fc.sequences.data.update(id=sequence_id, rows=seq_data_update)
print(type(res))
print(res)
assert res == SequenceAPIResponse(count=seq_data_update_len, errors=None)
# upsert data
seq_data_upsert = SequenceColumnsAndRows(
columns=["geo1", "double1", "string1", "uuid1"],
rows=[
{
"id": create_row_ids[1],
"values": [
{"type": "Point", "coordinates": [2.2, 3.3]},
11,
"string_value1u",
"3fa85f64-5717-4562-b3fc-2c963f66afa7",
],
},
{
"id": create_row_ids[2],
"values": [
{"type": "Point", "coordinates": [31.11, 34.64]},
14,
"string_value4u",
"3fa85f64-5717-4562-b3fc-2c963f66afa8",
],
},
],
)
seq_data_upsert_len = len(seq_data_upsert.rows)
res = await fc.sequences.data.upsert(id=sequence_id, rows=seq_data_upsert)
print(type(res))
print(res)
assert res == SequenceAPIResponse(count=seq_data_upsert_len, errors=None)
res = await fc.sequences.data.retrieve(id=sequence_id)
print(type(res))
print(res)
assert len(res) == create_row_ids_len
# Remove sequence data
res = await fc.sequences.data.remove(sequence_id, rows=create_row_ids)
print(type(res))
print(res)
assert res == SequenceAPIResponse(count=create_row_ids_len, errors=None)
# Delete sequence
response = await fc.sequences.remove(sequence_id)
print(response)
# Confirm sequence deleted
# check existence of a sequence that does not exist
response = await fc.sequences.fetch(sequence_id)
print(response)
assert bool(response) is False
print("Successfully completed")
return "Successfully completed"
async def async_main():
return await demo_sequences()
def main(request):
print(request)
outputs = uvloop.run(async_main())
print(outputs)
return outputs