How To Dev - Example: Using Advanced Smart City API in Python

 

See example of the usage of Python for developing MicroService and make it accessible from Proc.Logic /IoT app: https://www.snap4city.org/645

See also Python usage of API: https://www.snap4city.org/829


get token in Python


import requests

# this is only an example do not use in production,

# consider that each call it will create a new session on keycloak for each request,

# the token should be reused until it is expired and when expired it can be 'refreshed' using the refresh token

 

def getTokenViaUserCredentials(username,password):

    payload = {

        'f': 'json',

        'client_id': xxxxx-tool',

        'grant_type': 'password',

        'username': username,

        'password': password

    }

 

    header = {

        'Content-Type': 'application/x-www-form-urlencoded'

    }

 

    urlToken = "https://www.snap4city.org/auth/realms/master/protocol/openid-connect/token"

    response = requests.request("POST", urlToken, data=payload, headers=header)

    token = response.json()

    return token

 

def getTokenViaRefreshToken(old_token):

    if not 'refresh_token' in old_token:

        return None

   

    payload = {

                'f': 'json',

                'client_id': xxxxx-tool',

                'grant_type': 'refresh_token',

              'refresh_token': old_token['refresh_token']

               }

 

    header = {

        'Content-Type': 'application/x-www-form-urlencoded'

    }

 

    urlToken = "https://www.snap4city.org/auth/realms/master/protocol/openid-connect/token"

    response = requests.request("POST", urlToken, data=payload, headers=header)

    token = response.json()

    return token


Access data via Smart City API: get historical and last data via ServiceURI, in Python


#get the token of the user

token = getTokenViaUserCredentials ('…user…','…password…')

print('token', token)

if 'access_token' in token :

    head = {

                "Content-Type": "application/json",

                "Authorization": f"Bearer {token['access_token']}"

            }

    api_url ="https://<xxxxx>/ServiceMap/api/v1/"

    service_uri = "http://www.disit.org/km4city/resource/iot/<xxxxx>/<xxxxx>/ccccc_Device_01"

    response = requests.request("GET", api_url + "?serviceUri=" + service_uri +"&fromTime=60-day", headers=head)

    print(response.json())

else:

    print('failed getting access token', token)


Send data on platform, via Broker: send/update values of a device, in Python


if 'access_token' in token :

    head = {

                "Content-Type": "application/json",

                "Authorization": f"Bearer {token['access_token']}"

            }

    api_url = "https://iot-app.snap4city.org/orion-broker-<xxxxx>/v2/"

    device_id = "ccccc_Device_01"

    payload = {

        "dateObserved": {

            "value": "2023-01-27T01:01:02.000Z",

            "type": "string"

          },

          "FatigueValue": {

            "value": 13.1,

            "type": "number"

      }   

    }

    response = requests.request("PATCH", api_url + "entities/" + device_id +"/attrs?type=Sensor&elementid=" + device_id, json=payload, headers=head)

    if response.status_code!=204 :

        print(response.json())

    else:

        print(device_id+" updated!")

else:

    print('failed getting access token', token)