Integration

Learn how to integrate with the ProphetX Service API in a sandbox environment, including generating API tokens and subscribing to web sockets.

ProphetX Service API — Integration

ProphetX has an API-based interaction to take advantage of its SweepStake platform. This article will go through a dummy integration experience in the sandbox environment.

API Status Page:

API Status

End Integration Demo:

Note: If you are already in contact with the Prophet Exchange team, you can get a sandbox account by contacting a team member via Slack or email [email protected].

For more information on how to become an API user, you can also contact Doug for more details. In this article, we are going through the integration in the sandbox environment, which will include demo funds for testing purposes.

Register at ProphetX Registration

Image

Since this is a sandbox environment, we do not need to worry if the birthday or address is wrong, but do make sure the phone number is correct as 2FA is needed even for the sandbox registration.

Contact us to request them to set your account to be an API account and add some testing funds. You can reach out to [email protected] or [email protected].

Generate API Tokens

After your account is approved as an API account, you are ready to generate tokens for the API integrations.

  1. Start a new session and log into ProphetX Sandbox with your credentials.

  2. Click the top right toggle, navigate to the Prophet Cash side, and click “API integration” in the dropdown.

    Image
  3. Click “create a new token”. Multiple tokens can be created, and each token can have multiple sessions. Multiple tokens can be used for different integration processes, and gives the freedom to revoke one of them without affecting all other integrations. In future updates, each token will also have different permissions, so more specific controls can be available.

Integration Steps:

GitHub code: API Integration Guide

API documents:

Keys needed:

Major Steps:

  1. Exchange a session key by using Access Key and Secret Key
  2. (Optional) Get your current wallet balance
  3. Seeding events based on tournaments you want to play
  4. Subscribe to web socket to get updates on sports events, market liquidity, and more
  5. Start to place/cancel plays

Step 1: Exchange Session Keys

Endpoint: partner/auth/login

Before we can call any other APIs, we need to exchange a short-lived (20 minutes) access token, and a relatively long-lived (3 days) refresh token using the access key and secret key combination. This access token needs to be provided for all following transactions, including subscribing to web sockets.

Example code:

def mm_login(self) -> dict:
    """
    Login to MM API, and store session keys in self.mm_session
    """
    login_url = urljoin(self.base_url, config.URL['mm_login'])
    request_body = {
        'access_key': self.mm_keys.get('access_key'),
        'secret_key': self.mm_keys.get('secret_key'),
    }
    response = requests.post(login_url, data=json.dumps(request_body))
    if response.status_code != 200:
        logging.debug(response)
        logging.debug("Please check your access key and secret key to the user_info.json")
        raise Exception("login failed")
    mm_session = json.loads(response.content)['data']
    logging.info(mm_session)
    self.mm_session = mm_session
    logging.info("MM session started")
    return mm_session

Authentication in all APIs is through the header:

def __get_auth_header(self) -> dict:
    return {
        'Authorization': f'Bearer '
                         f'{self.mm_session["access_token"]}',
    }

Step 2: (Optional) Get Your Current Wallet Balance

Endpoint: partner/mm/get_balance

Just in case you are interested in how much money is left in the wallet, a quick endpoint to get available balances.

Example code:

def get_balance(self):
    balance_url = urljoin(self.base_url, config.URL['mm_balance'])
    response = requests.get(balance_url, headers=self.__get_auth_header())
    if response.status_code != 200:
        logging.error("failed to get balance")
        return
    self.balance = json.loads(response.content).get('data', {}).get('balance', 0)
    logging.info(f"still have ${self.balance} left")

Step 3: Seeding Events Based on Tournaments

Endpoints:

  • partner/mm/get_tournaments
  • partner/mm/get_sport_events
  • partner/mm/get_markets
  • partner/mm/get_multiple_markets

The get_tournaments endpoint returns a list of tournaments the company supports. Refer to “API documents” for various options. The ID of the tournament is needed to call the second endpoint get_sports_events to get a list of events that are currently open to play. The same tournament ID is needed for the next step to subscribe to their web socket channels to get updates. Take MLB for example, in the code we find the ID for MLB, and then get all active events of the tournament.

Example code:

def seeding(self):
    """Get odds ladder, as only wagers having odds in the odds ladder are allowed.
    Get all tournaments/events and markets details of each event.
    :return:
    """
    logging.info("start to get odds ladder")
    odds_ladder_url = urljoin(self.base_url, config.URL['mm_odds_ladder'])
    odds_response = requests.get(odds_ladder_url, headers=self.__get_auth_header())
    if odds_response.status_code != 200:
        logging.info("not able to get valid odds from api, fall back to local constants")
        self.valid_odds = constants.VALID_ODDS_BACKUP
    else:
        self.valid_odds = odds_response.json()['data']

    # initiate available tournaments/sport_events
    # tournaments
    logging.info("start seeding tournaments/events/markets")
    t_url = urljoin(self.base_url, config.URL['mm_tournaments'])
    headers = self.__get_auth_header()
    all_tournaments_response = requests.get(t_url, headers=headers)
    if all_tournaments_response.status_code != 200:
        raise Exception("not able to seed tournaments")
    all_tournaments = json.loads(all_tournaments_response.content).get('data', {}).get('tournaments', {})
    self.all_tournaments = all_tournaments

    # get sport events and markets of each event
    event_url = urljoin(self.base_url, config.URL['mm_events'])
    multiple_markets_url = urljoin(self.base_url, config.URL['mm_multiple_markets'])
    for one_t in all_tournaments:
        if one_t['name'] in config.TOURNAMENTS_INTERESTED:
            self.my_tournaments[one_t['id']] = one_t
            events_response = requests.get(event_url, params={'tournament_id': one_t['id']}, headers=headers)
            if events_response.status_code == 200:
                events = json.loads(events_response.content).get('data', {}).get('sport_events')
                if events is None:
                    continue
                event_ids = ','.join([str(event['event_id']) for event in events])
                # instead of getting markets of one event at a time,
                # using get_multiple_markets to batch get markets of a list of events
                multiple_markets_response = requests.get(multiple_markets_url, params={'event_ids': event_ids},
                                                   headers=headers)
                if multiple_markets_response.status_code == 200:
                    map_market_by_event_id = json.loads(multiple_markets_response.content).get('data', {})
                    for event in events:
                        event['markets'] = map_market_by_event_id[str(event['event_id'])]
                        self.sport_events[event['event_id']] = event
                        logging.info(f'successfully get markets of events {event["name"]}')
                else:
                    logging.info(f'failed to get markets of events ids: {".join([str(event["event_id"]) for event in events])}')
            else:
                logging.info(f'skip tournament {one_t["name"]} as api request failed')
    logging.info("Done, seeding")
    logging.info(f"found {len(self.my_tournaments)} tournament, ingested {len(self.sport_events)} "
                 f"sport events from {len(config.TOURNAMENTS_INTERESTED)} tournaments")

Step 4: Subscribe to Web Sockets

Subscribe to web sockets to get updates on sports events and market liquidity. Websocket is powered by Pusher (Pusher) under the hood.

Get connection-config:

  1. Use endpoint /partner/websocket/connection-config to get connection info (it will include key, app_id, and cluster_id used to create connection to pusher in below section)

Endpoints: partner/mm/pusher

In order to get sport event and market liquidity updates, we need to subscribe to their web socket to get this information pushed back to us. There are mostly two channels we need to subscribe to:

  1. Broadcast channel (private-broadcast-service=3-device_type=5), where the same information gets pushed to all Market Makers.
  2. Private channel (private-service=3-device_type=5-user=<partnerId>), where information on your wagers is pushed through.

No need to construct these two channels explicitly, the endpoint partner/mm/pusher can be called directly to get these two channels, and events that are available in each channel.

In this example, I subscribed to all tournaments updates, but only provided a callback for tournaments I am interested in, which is achieved by only binding to a specific msg topic. In my case, MLB has id 109, so I bind to “tournaments_109”.

Example code:

def _get_channels(self, socket_id: float):
    """
    Get a list of all channels and topics of each channel that you are allowing to subscribe to.
    Even though there are public and private channels, the channel id is unique for each API user.
    """
    auth_endpoint_url = urljoin(self.base_url, config.URL['mm_auth'])
    channels_response = requests.post(auth_endpoint_url,
                                      data={'socket_id': socket_id},
                                      headers=self.__get_auth_header())
    if channels_response.status_code != 200:
        logging.error("failed to get channels")
        raise Exception("failed to get channels")
    channels = channels_response.json()
    return channels.get('data', {}).get('authorized_channel', [])
def _get_connection_config(self):
    """
    Get websocket connection configurations. We are using Pusher as our websocket service,
    and only authenticated channels are used.
    More details can be found in https://pusher.com/docs/channels/using_channels/user-authentication/.
    The connection configuration is designed for stability and infrequent updates.
    However, in the unlikely event of a Pusher cluster incident,
    we will proactively migrate to a new cluster to ensure uninterrupted service.
    To maintain optimal connectivity, we recommend users retrieve the latest connection configuration
    at least once every thirty minutes. If the retrieved configuration remains unchanged,
    no further action is required. In the event of a new configuration being discovered,
    users should update their connection accordingly.
    """
    connection_config_url = urljoin(self.base_url, config.URL['websocket_config'])
    connection_response = requests.get(connection_config_url, headers=self.__get_auth_header())
    if connection_response.status_code != 200:
        logging.error("failed to get connection configs")
        raise Exception("failed to get channels")
    conn_configs = connection_response.json()
    return conn_configs

def subscribe(self):
    """
    1. Get Pusher connection configurations
    2. Connect to Pusher websocket service by providing authentication credentials
    3. Wait for the websocket connection successful handshake and then execute connect_handler in the callback
    4. Subscribe to public channels on selected topics
    5. Subscribe to private channels on all topics
    """
    connection_config = self._get_connection_config()  # not working yet, getting wrong config
    key = connection_config['key']
    cluster = connection_config['cluster']

    auth_endpoint_url = urljoin(self.base_url, config.URL['mm_auth'])
    auth_header = self.__get_auth_header()
    auth_headers = {
                       "Authorization": auth_header['Authorization'],
                       "header-subscriptions": '''[{"type":"tournament","ids":[]}]''',
                   }
    self.pusher = pysher.Pusher(key=key, cluster=cluster,
                                auth_endpoint=auth_endpoint_url,
                                auth_endpoint_headers=auth_headers)

    def public_event_handler(*args, **kwargs):
        print("processing public, Args:", args)
        print(f"event details {base64.b64decode(json.loads(args[0]).get('payload', '{}'))}")
        print("processing public, Kwargs:", kwargs)

    def private_event_handler(*args, **kwargs):
        print("processing private, Args:", args)
        print(f"event details {base64.b64decode(json.loads(args[0]).get('payload', '{}'))}")
        print("processing private, Kwargs:", kwargs)

    # We can't subscribe until we've connected, so we use a callback handler
    # to subscribe when able
    def connect_handler(data):
        socket_id = json.loads(data)['socket_id']
        available_channels = self._get_channels(socket_id)
        broadcast_channel_name = None
        private_channel_name = None
        private_events = None
        for channel in available_channels:
            if 'broadcast' in channel['channel_name']:
                broadcast_channel_name = channel['channel_name']
            else:
                private_channel_name = channel['channel_name']
                private_events = channel['binding_events']
        broadcast_channel = self.pusher.subscribe(broadcast_channel_name)
        private_channel = self.pusher.subscribe(private_channel_name)
        for t_id in self.my_tournaments:
            event_name = f'tournament_{t_id}'
            broadcast_channel.bind(event_name, public_event_handler)
            logging.info(f"subscribed to public channel, event name: {event_name}, successfully")

        for private_event in private_events:
            private_channel.bind(private_event['name'], private_event_handler)
            logging.info(f"subscribed to private channel, event name: {private_event['name']}, successfully")

    self.pusher.connection.bind('pusher:connection_established', connect_handler)
    self.pusher.connect()

Step 5: Start to Place/Cancel Plays

Endpoints:

  • partner/mm/place_wager
  • partner/mm/cancel_wager
  • partner/mm/place_batch_wagers
  • partner/mm/cancel_batch_wagers

The place_wager/place_batch_wagers endpoint needs to provide a unique “external_id” for each request. If the same external_id is provided for different transactions, only the first one will succeed, and the rest of the requests with the same external_id will be rejected. This is to prevent making the same plays too many times.

The cancel_wager/cancel_batch_wagers endpoint needs the same external_id used to place the play, plus the wager_id returned from the place_wager endpoint.

Since this is a sandbox environment I am using to test a dummy but working integration, I only added randomly make $1 plays on random games on the moneyline market only, and then randomly cancel some of them after 8 seconds.

Example code:

def start_playing(self):
    """
    Example on how to place wagers using single wager placement restful API, place_wager,
    also batch wager placement restful API place_multiple_wagers
    :return: Wager ids returned from the API are stored in a class object for wager cancellation example
    """
    logging.info("Start playing, randomly :)")
    play_url = urljoin(self.base_url, config.URL['mm_place_wager'])
    batch_play_url = urljoin(self.base_url, config.URL['mm_batch_place'])
    if '.prophetx.co' in play_url:
        raise Exception("only allowed to run in non production environment")
    for key in self.sport_events:
        one_event = self.sport_events[key]
        for market in one_event.get('markets', []):
            if market['type'] == 'moneyline':
                # only play on moneyline
                if random.random() < 0.3:   # 30% chance to play
                    for selection in market.get('selections', []):
                        if random.random() < 0.3: #30% chance to play
                            odds_to_play = self.__get_random_odds()
                            external_id = str(uuid.uuid1())
                            logging.info(f"going to play on '{one_event['name']}' on moneyline, side {selection[0]['name']} with odds {odds_to_play}")
                            body_to_send = {
                                'external_id': external_id,
                                'line_id': selection[0]['line_id'],
                                'odds': odds_to_play,
                                'stake': 1.0
                            }
                            play_response = requests.post(play_url, json=body_to_send,
                                                         headers=self.__get_auth_header())
                            if play_response.status_code != 200:
                                logging.info(f"failed to play, error {play_response.content}")
                            else:
                                logging.info("successfully")
                                self.wagers[external_id] = json.loads(play_response.content).get('data', {})['wager']['id']
                            # testing batch place wagers
                            batch_n = 3
                            external_id_batch = [str(uuid.uuid1()) for x in range(batch_n)]
                            batch_body_to_send = [{
                                'external_id': external_id_batch[x],
                                'line_id': selection[0]['line_id'],
                                'odds': odds_to_play,
                                'stake': 1.0
                            } for x in range(batch_n)]
                            batch_play_response = requests.post(batch_play_url, json={"data": batch_body_to_send},
                                                               headers=self.__get_auth_header())
                            if batch_play_response.status_code != 200:
                                logging.info(f"failed to play, error {play_response.content}")
                            else:
                                logging.info("successfully")
                                for wager in batch_play_response.json()['data']['succeed_wagers']:
                                    self.wagers[wager['external_id']] = wager['id']
def random_cancel_wager(self):
    wager_keys = list(self.wagers.keys())
    for key in wager_keys:
        wager_id = self.wagers[key]
        cancel_url = urljoin(self.base_url, config.URL['mm_cancel_wager'])
        if random.random() < 0.5:  # 50% cancel
            logging.info("start to cancel wager")
            body = {
                'external_id': key,
                'wager_id': wager_id,
            }
            response = requests.post(cancel_url, json=body, headers=self.__get_auth_header())
            if response.status_code != 200:
                if response.status_code == 404:
                    logging.info("already cancelled")
                    self.wagers.pop(key)
                else:
                    logging.info("failed to cancel")
            else:
                logging.info("cancelled successfully")
                self.wagers.pop(key)

Hopefully, this is helpful for you to integrate with Prophet’s API.