Skip to content

Commit a986bb6

Browse files
committed
revise coordinator
Signed-off-by: Lei Wang <[email protected]>
1 parent 7533a49 commit a986bb6

File tree

6 files changed

+64
-20
lines changed

6 files changed

+64
-20
lines changed

coordinator/flex/server/__main__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
import connexion
44

55
from flex.server import encoder
6+
from datetime import datetime
67

78

89
def main():
10+
current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
11+
with open("/tmp/cluster_create_time.txt", "w") as f:
12+
f.write(current_timestamp)
913
app = connexion.App(__name__, specification_dir='./openapi/')
1014
app.app.json_encoder = encoder.JSONEncoder
1115
app.add_api('openapi.yaml',

coordinator/flex/server/controllers/data_source_controller.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ def get_datasource_by_id(graph_id): # noqa: E501
7272
except Exception as e:
7373
return "Failed to get data source: " + str(e), 500
7474

75+
if data_source_config is None:
76+
return (SchemaMapping.from_dict({}), 200)
77+
7578
data_source_config = json.loads(data_source_config.decode("utf-8"))
7679

7780
return (SchemaMapping.from_dict(data_source_config), 200)

coordinator/flex/server/controllers/deployment_controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def get_deployment_info(): # noqa: E501
2222
"""
2323
result_dict = {}
2424
result_dict["cluster_type"] = "KUBERNETES"
25-
with open ("/tmp/graph_schema_create_time.txt", "r") as f:
25+
with open ("/tmp/cluster_create_time.txt", "r") as f:
2626
result_dict["creation_time"] = f.read()
2727
result_dict["instance_name"] = "gart"
2828
result_dict["frontend"] = "Cypher/Gremlin"

coordinator/flex/server/controllers/graph_controller.py

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,30 @@ def get_graph_schema():
6464
rg_mapping_str = rg_mapping_str.decode("utf-8")
6565
break
6666
try_times += 1
67-
time.sleep(1)
67+
time.sleep(0.2)
6868
except Exception as e:
6969
try_times += 1
70-
time.sleep(1)
70+
time.sleep(0.2)
7171

7272
if try_times == try_max_times:
73+
try_times = 0
74+
original_graph_schema_key = etcd_prefix + "gart_graph_schema_json"
75+
while try_times < try_max_times:
76+
try:
77+
original_graph_schema_str, _ = etcd_client.get(original_graph_schema_key)
78+
if original_graph_schema_str is not None:
79+
original_graph_schema_str = original_graph_schema_str.decode("utf-8")
80+
break
81+
try_times += 1
82+
time.sleep(0.2)
83+
except Exception as e:
84+
try_times += 1
85+
time.sleep(0.2)
86+
if try_times == try_max_times:
87+
return result_dict
88+
result_dict["name"] = GRAPH_ID
89+
result_dict["id"] = GRAPH_ID
90+
result_dict["schema"] = json.loads(original_graph_schema_str)["schema"]
7391
return result_dict
7492

7593
rg_mapping = yaml.load(rg_mapping_str, Loader=yaml.SafeLoader)
@@ -83,10 +101,10 @@ def get_graph_schema():
83101
table_schema_str = table_schema_str.decode("utf-8")
84102
break
85103
try_times += 1
86-
time.sleep(1)
104+
time.sleep(0.2)
87105
except Exception as e:
88106
try_times += 1
89-
time.sleep(1)
107+
time.sleep(0.2)
90108

91109
if try_times == try_max_times:
92110
return result_dict
@@ -463,12 +481,19 @@ def get_graph_by_id(graph_id): # noqa: E501
463481
result_dict["id"] = graph_id
464482
result_dict["name"] = graph_id
465483

466-
with open("/tmp/graph_schema_create_time.txt", "r") as f:
467-
result_dict["creation_time"] = f.read()
468-
result_dict["schema_update_time"] = result_dict["creation_time"]
469-
470-
with open("/tmp/data_loading_job_created_time.txt", "r") as f:
471-
result_dict["data_update_time"] = f.read()
484+
try:
485+
with open("/tmp/graph_schema_create_time.txt", "r") as f:
486+
result_dict["creation_time"] = f.read()
487+
result_dict["schema_update_time"] = result_dict["creation_time"]
488+
except:
489+
result_dict["creation_time"] = ""
490+
result_dict["schema_update_time"] = ""
491+
492+
try:
493+
with open("/tmp/data_loading_job_created_time.txt", "r") as f:
494+
result_dict["data_update_time"] = f.read()
495+
except:
496+
result_dict["data_update_time"] = ""
472497

473498
return (GetGraphResponse.from_dict(result_dict), 200)
474499

@@ -553,12 +578,19 @@ def list_graphs(): # noqa: E501
553578
if not result_dict:
554579
return ([GetGraphResponse.from_dict({})], 200)
555580

556-
with open("/tmp/graph_schema_create_time.txt", "r") as f:
557-
result_dict["creation_time"] = f.read()
558-
result_dict["schema_update_time"] = result_dict["creation_time"]
559-
560-
with open("/tmp/data_loading_job_created_time.txt", "r") as f:
561-
result_dict["data_update_time"] = f.read()
581+
try:
582+
with open("/tmp/graph_schema_create_time.txt", "r") as f:
583+
result_dict["creation_time"] = f.read()
584+
result_dict["schema_update_time"] = result_dict["creation_time"]
585+
except:
586+
result_dict["creation_time"] = ""
587+
result_dict["schema_update_time"] = ""
588+
589+
try:
590+
with open("/tmp/data_loading_job_created_time.txt", "r") as f:
591+
result_dict["data_update_time"] = f.read()
592+
except:
593+
result_dict["data_update_time"] = ""
562594

563595
return ([GetGraphResponse.from_dict(result_dict)], 200)
564596

coordinator/flex/server/controllers/service_controller.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,11 @@ def list_service_status(): # noqa: E501
6868
gremlin_service_name = os.getenv('GREMLIN_SERVICE_NAME', 'gremlin-service')
6969
gremlin_service_port = os.getenv('GIE_GREMLIN_PORT', '8182')
7070
gremlin_service_ip = get_external_ip_of_a_service(gremlin_service_name, k8s_namespace)
71-
with open("/tmp/graph_id.txt", "r") as f:
72-
graph_id = f.read()
71+
try:
72+
with open("/tmp/graph_id.txt", "r") as f:
73+
graph_id = f.read()
74+
except:
75+
graph_id = "gart_graph"
7376
result_dict["graph_id"] = graph_id
7477
result_dict["status"] = "Running"
7578
result_dict["sdk_endpoints"] = {}

scripts/controller.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ def get_all_available_read_epochs_internal():
610610
num_fragment = os.getenv("SUBGRAPH_NUM", "1")
611611
latest_read_epoch = get_latest_read_epoch()
612612
if latest_read_epoch == 2**64 - 1:
613-
return []
613+
return [[], []]
614614
available_epochs = []
615615
available_epochs_internal = []
616616
for epoch in range(latest_read_epoch + 1):
@@ -659,6 +659,8 @@ def get_latest_read_epoch():
659659
etcd_key = etcd_prefix + "gart_latest_epoch_p" + str(idx)
660660
try:
661661
etcd_value, _ = etcd_client.get(etcd_key)
662+
if etcd_value is None:
663+
etcd_value = 2**64 - 1
662664
if latest_epoch > int(etcd_value):
663665
latest_epoch = int(etcd_value)
664666
except Exception as e:

0 commit comments

Comments
 (0)