finalement j'y suis arrivé, voici un script qui charge les données en stream pour du 1 minute...il y a une classe qui épure le flux de facon à ne pas surcharger le csv en ne gardant que la temporalité une minute.......
Attention il faut absolument installer: python -m pip install lightstreamer-client-lib
Code : #
# -*- coding: utf-8 -*-
import logging
import sys
import pandas as (mot censuré merci de rester poli)
from trading_ig import IGService, IGStreamService
from lightstreamer.client import Subscription, LightstreamerClient, SubscriptionListener, ItemUpdate, ConsoleLoggerProvider, ConsoleLogLevel
# Configuration du logger standard
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
# Configuration du logger pour Lightstreamer
loggerProvider = ConsoleLoggerProvider(ConsoleLogLevel.INFO)
LightstreamerClient.setLoggerProvider(loggerProvider)
class ChartListener(SubscriptionListener):
last_written_minute = None # Variable pour suivre la dernière minute écrite
def onItemUpdate(self, update: ItemUpdate):
logger.info(f"Data received for {update.getItemName()}:")
data = {}
try:
fields = ["UTM", "BID_OPEN", "BID_HIGH", "BID_LOW", "BID_CLOSE", "ballots_END", "ballots_TICK_COUNT"]
for field in fields:
value = update.getValue(field)
if value is not None:
data[field] = value
logger.info(f"{field}: {value}")
# Convertir UTM en temps et vérifier si c'est une nouvelle minute
current_time = (mot censuré merci de rester poli).to_datetime(int(data["UTM"]), unit='ms')
current_minute = current_time.floor('T')
if current_minute != ChartListener.last_written_minute:
# Ajout de la colonne de temps formatée
data['Timestamp'] = current_time.strftime('%Y-%m-%d %H:%M:%S')
# Écriture des données dans un fichier CSV
(mot censuré merci de rester poli).DataFrame([data]).to_csv("streaming_data.csv", mode='a', header=False, index=True)
ChartListener.last_written_minute = current_minute
except Exception as e:
logger.error(f"Error in onItemUpdate: {e}")
def main():
# Paramètres de l'API IG et connexion
username =
password =
api_key =
acc_type =
acc_number =
ig_service = IGService(username, password, api_key, acc_type, acc_number=acc_number)
ig_service.create_session()
# Récupération des tokens CST et X-SECURITY-TOKEN
response = ig_service.session
cst_token = response.headers['CST']
x_sec_token = response.headers['X-SECURITY-TOKEN']
# Récupération de l'URL Lightstreamer
ls_endpoint = ig_service.read_session()["lightstreamerEndpoint"]
# Configuration du service IG Streaming
ls_client = LightstreamerClient(ls_endpoint, adapterSet="DEFAULT")
ls_client.connectionDetails.setUser(acc_number)
ls_client.connectionDetails.setPassword(f"CST-{cst_token}|XST-{x_sec_token}")
# Création d'un fichier CSV avec en-tête
(mot censuré merci de rester poli).DataFrame(columns=["UTM", "BID_OPEN", "BID_HIGH", "BID_LOW", "BID_CLOSE", "ballots_END", "ballots_TICK_COUNT"]).to_csv("streaming_data.csv", index=False)
# Configuration de la souscription
epic = "IX.D.SPTRD.IFE.IP"
scale = "1MINUTE"
fields = ["UTM", "BID_OPEN", "BID_HIGH", "BID_LOW", "BID_CLOSE", "ballots_END", "ballots_TICK_COUNT"]
items = [f"CHART:{epic}:{scale}"]
chart_subscription = Subscription(
mode="MERGE",
items=items,
fields=fields
)
chart_subscription.addListener(ChartListener())
# Connexion au serveur Lightstreamer
try:
ls_client.connect()
logger.info("Connected to Lightstreamer server")
except Exception as e:
logger.error(f"Error connecting to Lightstreamer server: {e}")
return
# Souscription
try:
ls_client.subscribe(chart_subscription)
logger.info("Subscription request sent")
except Exception as e:
logger.error(f"Error during subscription: {e}")
# Boucle principale
try:
while True:
pass
except KeyboardInterrupt:
logger.info("Interrupted by user, shutting down")
except Exception as e:
logger.error(f"Error in main loop: {e}")
# Déconnexion
try:
ls_client.unsubscribe(chart_subscription)
ls_client.disconnect()
ig_service.logout()
logger.info("Disconnected and cleaned up")
except Exception as e:
logger.error(f"Error during disconnection and cleanup: {e}")
if __name__ == "__main__":
main()