Fred808 commited on
Commit
d3e88f3
·
verified ·
1 Parent(s): 6b04d23

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +80 -5
app.py CHANGED
@@ -28,7 +28,7 @@ class Settings:
28
  AGGREGATOR_URL = os.getenv("AGGREGATOR_URL", "http://192.168.1.104:8002")
29
 
30
  # Model settings
31
- MODEL_REPO = "https://huggingface.co/microsoft/Florence-2-large"
32
 
33
  # Server settings
34
  TENSOR_SERVER_TIMEOUT = 30 # seconds
@@ -634,10 +634,31 @@ async def register_tensor_server(server_url: HttpUrl):
634
  state.tensor_servers[str(server_url)] = TensorServer(url=server_url)
635
  print(f"[INFO] Registered new tensor server at {server_url}")
636
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
637
  return {
638
  "status": "registered",
639
  "registered_servers": len(state.tensor_servers),
640
- "server_id": str(server_url)
 
 
641
  }
642
 
643
  @app.delete("/unregister_tensor_server")
@@ -721,6 +742,30 @@ async def initialize_system():
721
  else:
722
  files_status[filename] = {"exists": exists, "size_bytes": 0}
723
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
724
  return {
725
  "status": "initialized",
726
  "model_loaded": state.is_model_loaded,
@@ -728,7 +773,10 @@ async def initialize_system():
728
  "total_size_bytes": total_size,
729
  "config_loaded": bool(state.model_config),
730
  "model_type": state.model_config.get("model_type", "unknown"),
731
- "architecture": state.model_config.get("architectures", ["unknown"])[0]
 
 
 
732
  }
733
 
734
  # ===== Main Execution =====
@@ -736,8 +784,35 @@ async def initialize_system():
736
  async def startup_event():
737
  """Initialize the server and start background tasks"""
738
  print("[INFO] Initializing system...")
739
- await initialize_system()
740
- print("[INFO] Model initialization complete")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
741
 
742
  # Start monitoring task
743
  asyncio.create_task(monitor_tensor_servers())
 
28
  AGGREGATOR_URL = os.getenv("AGGREGATOR_URL", "http://192.168.1.104:8002")
29
 
30
  # Model settings
31
+ MODEL_REPO = "https://huggingface.co/facebook/opt-125m"
32
 
33
  # Server settings
34
  TENSOR_SERVER_TIMEOUT = 30 # seconds
 
634
  state.tensor_servers[str(server_url)] = TensorServer(url=server_url)
635
  print(f"[INFO] Registered new tensor server at {server_url}")
636
 
637
+ # If model is loaded, automatically distribute chunks
638
+ if state.is_model_loaded:
639
+ print(f"[INFO] Model is loaded, starting distribution for new server {server_url}")
640
+ try:
641
+ # Create chunks if they don't exist
642
+ if not state.model_chunks:
643
+ if await split_model_weights():
644
+ print(f"[INFO] Successfully split model into {len(state.model_chunks)} chunks")
645
+ else:
646
+ print("[ERROR] Failed to split model weights")
647
+
648
+ # Distribute chunks
649
+ if await distribute_model_chunks():
650
+ print("[INFO] Successfully distributed chunks to tensor servers")
651
+ else:
652
+ print("[ERROR] Failed to distribute chunks")
653
+ except Exception as e:
654
+ print(f"[ERROR] Distribution error during server registration: {str(e)}")
655
+
656
  return {
657
  "status": "registered",
658
  "registered_servers": len(state.tensor_servers),
659
+ "server_id": str(server_url),
660
+ "model_loaded": state.is_model_loaded,
661
+ "chunks_distributed": len(state.model_chunks) if state.model_chunks else 0
662
  }
663
 
664
  @app.delete("/unregister_tensor_server")
 
742
  else:
743
  files_status[filename] = {"exists": exists, "size_bytes": 0}
744
 
745
+ # Start model distribution if we have tensor servers
746
+ distribution_status = "not_started"
747
+ if state.tensor_servers:
748
+ print("[INFO] Starting automatic model distribution...")
749
+ try:
750
+ # Split model into chunks
751
+ if await split_model_weights():
752
+ print(f"[INFO] Successfully split model into {len(state.model_chunks)} chunks")
753
+ # Distribute chunks to servers
754
+ if await distribute_model_chunks():
755
+ print("[INFO] Successfully distributed chunks to tensor servers")
756
+ distribution_status = "completed"
757
+ else:
758
+ print("[ERROR] Failed to distribute chunks")
759
+ distribution_status = "distribution_failed"
760
+ else:
761
+ print("[ERROR] Failed to split model weights")
762
+ distribution_status = "split_failed"
763
+ except Exception as e:
764
+ print(f"[ERROR] Distribution error: {str(e)}")
765
+ distribution_status = f"error: {str(e)}"
766
+ else:
767
+ print("[INFO] No tensor servers registered yet. Will distribute when servers register.")
768
+
769
  return {
770
  "status": "initialized",
771
  "model_loaded": state.is_model_loaded,
 
773
  "total_size_bytes": total_size,
774
  "config_loaded": bool(state.model_config),
775
  "model_type": state.model_config.get("model_type", "unknown"),
776
+ "architecture": state.model_config.get("architectures", ["unknown"])[0],
777
+ "distribution_status": distribution_status,
778
+ "registered_servers": len(state.tensor_servers),
779
+ "chunks_created": len(state.model_chunks) if state.model_chunks else 0
780
  }
781
 
782
  # ===== Main Execution =====
 
784
  async def startup_event():
785
  """Initialize the server and start background tasks"""
786
  print("[INFO] Initializing system...")
787
+ try:
788
+ # Initialize system and download model
789
+ await initialize_system()
790
+ print("[INFO] Model initialization complete")
791
+
792
+ # If we have pre-configured tensor servers, try to connect to them
793
+ if Settings.TENSOR_SERVER_URLS:
794
+ print(f"[INFO] Attempting to connect to {len(Settings.TENSOR_SERVER_URLS)} pre-configured tensor servers...")
795
+ for url in Settings.TENSOR_SERVER_URLS:
796
+ try:
797
+ if await check_tensor_server_health(url):
798
+ state.tensor_servers[str(url)] = TensorServer(url=url)
799
+ print(f"[INFO] Successfully registered pre-configured server at {url}")
800
+ except Exception as e:
801
+ print(f"[WARN] Failed to connect to pre-configured server {url}: {str(e)}")
802
+
803
+ # If we have both model and servers, start distribution
804
+ if state.is_model_loaded and state.tensor_servers:
805
+ print("[INFO] Starting initial model distribution...")
806
+ if await split_model_weights():
807
+ print(f"[INFO] Split model into {len(state.model_chunks)} chunks")
808
+ if await distribute_model_chunks():
809
+ print("[INFO] Successfully completed initial distribution")
810
+ else:
811
+ print("[WARN] Initial distribution failed")
812
+ else:
813
+ print("[WARN] Failed to split model weights")
814
+ except Exception as e:
815
+ print(f"[ERROR] Startup error: {str(e)}")
816
 
817
  # Start monitoring task
818
  asyncio.create_task(monitor_tensor_servers())