#!/usr/bin/env python3 """ 测试币安API连接和数据获取功能 """ import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from binance.client import Client import pandas as pd def test_binance_connection(): """测试币安连接""" try: # 初始化客户端(使用公开API) client = Client("", "") # 测试获取服务器时间 server_time = client.get_server_time() print(f"✅ 币安服务器连接成功,服务器时间: {server_time}") # 测试获取交易对信息 exchange_info = client.get_exchange_info() print(f"✅ 获取交易所信息成功,共有 {len(exchange_info['symbols'])} 个交易对") # 测试获取K线数据 klines = client.get_klines(symbol='BTCUSDT', interval='1h', limit=10) print(f"✅ 获取BTCUSDT K线数据成功,获取到 {len(klines)} 条数据") # 转换为DataFrame并显示 df = pd.DataFrame(klines, columns=[ 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore' ]) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') print("\n最新的5条K线数据:") print(df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].tail()) return True except Exception as e: print(f"❌ 币安连接测试失败: {e}") return False if __name__ == "__main__": print("🚀 开始测试币安API连接...") success = test_binance_connection() if success: print("\n✅ 所有测试通过!币安API集成准备就绪。") else: print("\n❌ 测试失败,请检查网络连接。")