Documentation Index Fetch the complete documentation index at: https://benzinga-2-locadex-parallel-main.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
مستودع GitHub عرض الشيفرة المصدرية والمساهمة فيها
متوافق مع Python 2.6+ و Python 3
لا يعتمد على مكتبات خارجية
يدعم الرسائل الكبيرة
منطق إعادة المحاولة قابل للتهيئة مع تأخير أُسِّي متزايد
ثبّت المكتبة باستخدام setup.py:
git clone https://github.com/Benzinga/python-bztcp.git
cd python-bztcp
python setup.py install
اختبر العميل البرمجي باستخدام العرض التوضيحي المدمج:
python -m bztcp USERNAME API_KEY
python -m bztcp USERNAME API_KEY RETRIES DELAY BACKOFF
python -m bztcp.__main__ USERNAME API_KEY
تتولى فئة bztcp.client.Client إدارة الاتصال وتدفق البيانات:
from __future__ import print_function
from bztcp.client import Client
client = Client( username = 'USERNAME' , key = 'API_KEY' )
for content in client.content_items():
title = content.get( 'title' , None )
print (title)
اضبط سلوك إعادة المحاولة باستخدام أسلوب التراجع الأسي (exponential backoff):
from bztcp.client import Client
client = Client(
username = 'USERNAME' ,
key = 'API_KEY' ,
retries = 5 , # الحد الأقصى لمحاولات إعادة المحاولة
delay = 90 , # التأخير الأولي بالثواني
backoff = 2 # مُضاعِف التراجع التصاعدي
)
for content in client.content_items():
title = content.get( 'title' , None )
print (title)
المعامِل الوصف القيمة الافتراضية usernameاسم مستخدم TCP الخاص بك لدى Benzinga إلزامي keyمفتاح الوصول إلى واجهة برمجة التطبيقات API الخاص بك إلزامي retriesالحد الأقصى لعدد المحاولات - delayالتأخير الابتدائي بين المحاولات (بالثواني) - backoffمعامل التراجع الأسي -
المعالجة منخفضة المستوى للرسائل
للحصول على تحكم أدق في حالة الاتصال والرسائل الفردية:
from bztcp.client import Client, STATUS_STREAM
from bztcp.exceptions import BzException
client = Client( username = 'USERNAME' , key = 'API_KEY' )
while True :
try :
msg = client.next_msg()
if msg.status == STATUS_STREAM :
print ( f "Content item: { msg.data } " )
else :
print ( f "Status: { msg.status } " )
except KeyboardInterrupt :
print ( "تم الإلغاء، جارٍ قطع الاتصال." )
client.disconnect()
break
except BzException as bze:
print ( f "BZ Error: { bze } " )
break
الحالة الوصف STATUS_STREAMرسالة محتوى متدفّقة عادية
Method Description content_items()مولِّد يُنتِج قواميس content next_msg()يُرجِع كائن الرسالة الخام التالية disconnect()يُنهي الاتصال بالخادم بشكل سلس
تُثير المكتبة الاستثناء BzException عند حدوث أخطاء خاصة بـ Benzinga:
from bztcp.exceptions import BzException
try :
for content in client.content_items():
process(content)
except BzException as e:
print ( f "خطأ في Benzinga: { e } " )
except Exception as e:
print ( f "خطأ غير متوقع: { e } " )
#!/usr/bin/env python
from __future__ import print_function
import json
from bztcp.client import Client
def main ():
client = Client(
username = 'YOUR_USERNAME' ,
key = 'YOUR_API_KEY' ,
retries = 5 ,
delay = 30 ,
backoff = 2
)
print ( "بدء بث Benzinga TCP..." )
for content in client.content_items():
# استخراج الحقول الأساسية
content_id = content.get( 'id' )
title = content.get( 'title' , 'بدون عنوان' )
channels = content.get( 'channels' , [])
tickers = [t[ 'name' ] for t in content.get( 'tickers' , [])]
# طباعة الملخص
print ( f "[ { content_id } ] { title } " )
if channels:
print ( f " Channels: { ', ' .join(channels) } " )
if tickers:
print ( f " Tickers: { ', ' .join(tickers) } " )
print ()
if __name__ == '__main__' :
main()