Python

How to Build a Web Traffic Monitor with Python, Flask, SQLite, and Pusher

If you have a web application running out there on the internet, you will need to know where your visitors are coming from, the systems they’re using, and other such things.

Although you can use services such as Google Analytics, Monster Insights, etc., it’s more fun to build a monitoring system using Python, SQL database, and Pusher for real-time data updates.

In today’s tutorial, we’ll go over how to create such a tool using Python, Flask, and Pusher. The tutorial is a highly-customized spin-off from a tutorial published on Pusher’s official page.

Requirements

For this build, you will need to know how to work with the python programming language, simple web development, and APIs.

Installing requirements

Start by installing Python on your system. You will also need to install Pusher and Flask, httpagentparser.

Creating the database

The first step is to create a database where the data is stored. For Python, sqlite3 comes as default, and using it is simple. Create a file called database.py and enter the code below:

    import sqlite3
    from sqlite3 import Error
    def create_connection(database):
            try:
                conn = sqlite3.connect(
                    database, isolation_level=None, check_same_thread=False)
                conn.row_factory = lambda c, r: dict(
                    zip([col[0] for col in c.description], r))
                return conn
            except Error as e:
                print(e)
    def create_table(c, sql):
            c.execute(sql)
    def update_or_create_page(c, data):
            sql = "SELECT * FROM pages where name=? and session=?"
            c.execute(sql, data[:-1])
            result = c.fetchone()
            if result == None:
                create_pages(c, data)
            else:
                print(result)
                update_pages(c, result['id'])
    def create_pages(c, data):
            print(data)
            sql = ''' INSERT INTO pages(name,session,first_visited)
                      VALUES (?,?,?) '''

            c.execute(sql, data)
    def update_pages(c, pageId):
            print(pageId)
            sql = ''' UPDATE pages
                      SET visits = visits+1
                      WHERE id = ?'''

            c.execute(sql, [pageId])
    def create_session(c, data):
            sql = ''' INSERT INTO sessions(ip, continent, country, city, os, browser, session, created_at)
                      VALUES (?,?,?,?,?,?,?,?) '''

            c.execute(sql, data)
    def select_all_sessions(c):
            sql = "SELECT * FROM sessions"
            c.execute(sql)
            rows = c.fetchall()
            return rows
    def select_all_pages(c):
            sql = "SELECT * FROM pages"
            c.execute(sql)
            rows = c.fetchall()
            return rows
    def select_all_user_visits(c, session_id):
            sql = "SELECT * FROM pages where session =?"
            c.execute(sql, [session_id])
            rows = c.fetchall()
            return rows
    def main():
            database = "./pythonsqlite.db"
            sql_create_pages = """
                CREATE TABLE IF NOT EXISTS pages (
                    id integer PRIMARY KEY,
                    name varchar(225) NOT NULL,
                    session varchar(255) NOT NULL,
                    first_visited datetime NOT NULL,
                    visits integer NOT NULL Default 1
                );
            """

            sql_create_session = """
                CREATE TABLE IF NOT EXISTS sessions (
                    id integer PRIMARY KEY,
                    ip varchar(225) NOT NULL,
                    continent varchar(225) NOT NULL,
                    country varchar(225) NOT NULL,
                    city varchar(225) NOT NULL,
                    os varchar(225) NOT NULL,
                    browser varchar(225) NOT NULL,
                    session varchar(225) NOT NULL,
                    created_at datetime NOT NULL
                );
            """

            # create a database connection
            conn = create_connection(database)
            if conn is not None:
                # create tables
                create_table(conn, sql_create_pages)
                create_table(conn, sql_create_session)
                print("Connection established!")
            else:
                print("Could not establish connection")
     
    if __name__ == '__main__':
            main()

Save the file and run the script to create the database with the relevant data.

python database.py

“Connection established!

Next, head over to pusher and create an account. Next, create an application and follow the wizard to setup the app. Once completed, copy the app keys and store them in a python dictionary as shown below.

    pusher = Pusher(
        app_id = "1079412",
        key = "e5d266a24f3502d2b814",
        secret = "bab634d2398eb5fcb0f8",
        cluster = "us2")

Finally, create a flask application and build the backend as shown in the code below:

    from flask import Flask, render_template, request, session, jsonify
    import urllib.request
    from pusher import Pusher
    from datetime import datetime
    import httpagentparser
    import json
    import os
    import hashlib
    from database import create_connection, create_session, update_or_create_page, select_all_sessions, select_all_user_visits, select_all_pages
    app = Flask(__name__)
    app.secret_key = os.urandom(24)
    # configure pusher object
    pusher = Pusher(
        app_id = "1079412",
        key = "e5d266a24f3502d2b814",
        secret = "bab634d2398eb5fcb0f8",
        cluster = "us2")
    database = "./pythonsqlite.db"
    conn = create_connection(database)
    c = conn.cursor()
     
    userOS = None
    userIP = None
    userCity = None
    userBrowser = None
    userCountry = None
    userContinent = None
    sessionID = None
    def main():
        global conn, c
    def parseVisitor(data):
        update_or_create_page(c, data)
        pusher.trigger(u'pageview', u'new', {
            u'page': data[0],
            u'session': sessionID,
            u'ip': userIP
        })
        pusher.trigger(u'numbers', u'update', {
            u'page': data[0],
            u'session': sessionID,
            u'ip': userIP
        })
    @app.before_request
    def getAnalyticsData():
        global userOS, userBrowser, userIP, userContinent, userCity, userCountry, sessionID
        userInfo = httpagentparser.detect(request.headers.get('User-Agent'))
        userOS = userInfo['platform']['name']
        userBrowser = userInfo['browser']['name']
        userIP = "196.207.130.148" if request.remote_addr == '127.0.0.1' else request.remote_addr
        api = "https://www.iplocate.io/api/lookup/" + userIP
        try:
            resp = urllib.request.urlopen(api)
            result = resp.read()
            result = json.loads(result.decode("utf-8"))
            userCountry = result["country"]
            userContinent = result["continent"]
            userCity = result["city"]
        except:
            print("Could not find: ", userIP)
        getSession()
    def getSession():
        global sessionID
        time = datetime.now().replace(microsecond=0)
        if 'user' not in session:
            lines = (str(time)+userIP).encode('utf-8')
            session['user'] = hashlib.md5(lines).hexdigest()
            sessionID = session['user']
            pusher.trigger(u'session', u'new', {
                u'ip': userIP,
                u'continent': userContinent,
                u'country': userCountry,
                u'city': userCity,
                u'os': userOS,
                u'browser': userBrowser,
                u'session': sessionID,
                u'time': str(time),
            })
            data = [userIP, userContinent, userCountry,
                    userCity, userOS, userBrowser, sessionID, time]
            create_session(c, data)
        else:
            sessionID = session['user']
    @app.route('/')
    def index():
        data = ['home', sessionID, str(datetime.now().replace(microsecond=0))]
        parseVisitor(data)
        return f'User data: {data}'
    @app.route('/get-all-sessions')
    def get_all_sessions():
        data = []
        dbRows = select_all_sessions(c)
        for row in dbRows:
            data.append({
                'ip': row['ip'],
                'continent': row['continent'],
                'country': row['country'],
                'city': row['city'],
                'os': row['os'],
                'browser': row['browser'],
                'session': row['session'],
                'time': row['created_at']
            })
        return jsonify(data)
     
     
    if __name__ == '__main__':
        main()
        app.run(debug=True)

Once completed, run the app using the command flask run and navigate to 127.0.0.1:5000/ This should log the user, the session information of the specific IP address including Agent (browser), Country, and such.

To view all the logged session, go to 127.0.0.1:5000/get-all-sessions.

    [
       {
          "browser":"Chrome",
          "city":"New York",
          "continent":"North America",
          "country":"United States",
          "ip":"192.148.18.103",
          "os":"Linux",
          "session":"9a5d6a84d93ad62a599293acb2e751a1",
          "time":"2021-01-13 02:52:32"
       },
       {
          "browser":"Mozilla",
          "city":"Oregon",
          "continent":"North America",
          "country":"United States",
          "ip":"66.115.149.229",
          "os":"Windows",
          "session":"64d205c98c839e1d346c733ffd41b27f",
          "time":"2021-01-13 02:54:12"
       },
       {
          "browser":"Chrome",
          "city":"Ogden",
          "continent":"North America",
          "country":"United States",
          "ip":"172.231.59.124",
          "os":"Windows",
          "session":"3fd564c16a32b5139a8dd0578e36aded",
          "time":"2021-01-13 02:54:37"
       },
       {
          "browser":"Chrome",
          "city":"New York",
          "continent":"North America",
          "country":"United States",
          "ip":"72.229.28.185",
          "os":"Windows",
          "session":"27ad92271023888427da216de10a7cae",
          "time":"2021-01-13 02:55:07"
       },
       {
          "browser":"Chrome",
          "city":"Nairobi",
          "continent":"Africa",
          "country":"Kenya",
          "ip":"196.207.130.148",
          "os":"Linux",
          "session":"c92cdab9eefa2fe121d49264986e7345",
          "time":"2021-01-13 02:56:43"
       },
       {
          "browser":"Chrome",
          "city":"Nairobi",
          "continent":"Africa",
          "country":"Kenya",
          "ip":"196.207.130.148",
          "os":"Windows",
          "session":"31ee28ec6a655e0fa13be4dba8c13861",
          "time":"2021-01-13 03:11:49"
       }
    ]

With the app running, you can randomly change your IP address and browsers to collect enough information for your database. Using the data gathered, you can use data tools such as ELK stack to visualize it and see which locations and browsers visit the application more.

The following is an example visualization of collected data from the app above.

Conclusion

In this tutorial, we used Python, SQLite, and Pusher to collect information about users visiting the website and then used the data to create visualizations.

To keep things simple, I limited the app output to console and JSON to accommodate those who have not worked with Flask jinja templating.

This simple app is open to expansion into a full-fledged web analytics tool. Consider the resources below for additional knowledge:

About the author

John Otieno

John Otieno

My name is John and am a fellow geek like you. I am passionate about all things computers from Hardware, Operating systems to Programming. My dream is to share my knowledge with the world and help out fellow geeks. Follow my content by subscribing to LinuxHint mailing list