Skip to content
Snippets Groups Projects
Commit b66c0db9 authored by EC2 Default User's avatar EC2 Default User
Browse files

organizing repository as python package

parent 1f0e244f
No related branches found
No related tags found
No related merge requests found
Showing
with 104 additions and 160 deletions
......@@ -2,4 +2,6 @@ venv/
__pycache__
.vscode/
.ipynb_checkpoints
diagnosis_service/db
\ No newline at end of file
models/
digpath.db
digpath.egg-info/
File moved
# Drexel 2022-2023 AIML Capstone - Digital Pathology
All documentation can be found on our tema Confluence page: https://drexel-cs591-dpath.atlassian.net/wiki/spaces/D2ACDP/overview
## Dependency Installation and Setup
A SQS queue is required to send messages and trigger the *diagnosis listener*. This service is configured to send messages to the `digpath-request` queue.
A SQL database is also required to run the service. Run the command below to create a database with a *request* table and *processing* table:
The *request* table will keep track of the requests that are submitted, which file was requested for the processing/diagnosis, and the *processing* table will keep track of the progress of for that request.
```
pip install -e .
cd db
python setup_db.py
```
## Running the application
Follow the instructions in the *digpath* folder to run the application.
\ No newline at end of file
# Digital Pathology Database
SQLite3 database file created and stored in this directory when `config.SQL_SERVER` is set to `False` and `python setup_db.py` is run.
\ No newline at end of file
#MySQL server vs SQL database
SQL_SERVER = False
import os
if SQL_SERVER:
import mysql.connector
from digpath import config
DB_HOST = "localhost"
DB_USER = "digpath"
DB_PASS = "password"
DB_NAME = "digpath"
connection = mysql.connector.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, database=DB_NAME)
if config.SQL_SERVER:
import mysql.connector
connection = mysql.connector.connect(
host=config.DB_HOST,
user=config.DB_USER,
password=config.DB_PASS,
database=config.DB_NAME
)
else:
import sqlite3
if not os.path.exists(config.DB_DIR):
os.makedirs(config.DB_DIR)
DB_FILE = "/home/ec2-user/db/digpath.db"
connection = sqlite3.connect(DB_FILE)
connection = sqlite3.connect(config.DB_FILE)
# cursor
crsr = connection.cursor()
......
File deleted
{
"info": "pretrained efficientnet fine-tuned 01/29/2023 on chips (including simulated)"
}
\ No newline at end of file
import os
import json
import time
import random
import contextlib
import traceback
from itertools import islice
import ray
import boto3
import model_manager_for_web_app
from database_connection import DigpathDatabase
from unified_image_reader import Image
from model_manager_for_web_app import ModelManager
DIAGNOSIS_RUNNER_STATUS_LOCK_TIMEOUT = 1
QUEUE_NAME = "digpath-request"
QUEUE = boto3.resource("sqs").get_queue_by_name(QueueName=QUEUE_NAME)
SQL_SERVER = False
if SQL_SERVER:
import mysql.connector
DB_HOST = "localhost"
DB_USER = "digpath"
DB_PASS = "password"
DB_NAME = "digpath"
DIGPATH_DB = DigpathDatabase(
mysql.connector.connect(host=DB_HOST, user=DB_USER, password=DB_PASS, database=DB_NAME)
)
else:
import sqlite3
DB_FILE = "/home/ec2-user/db/digpath.db"
DIGPATH_DB = DigpathDatabase(sqlite3.connect(DB_FILE, check_same_thread=False))
class DiagnosisRunner:
def __init__(self, model_name) -> None:
""" model_name is a model handled by ModelManagerForWebApp"""
self.model_name = model_name
self.model = ModelManager().load_model(self.model_name)
def make_region_stream(self, img, start_percentage, stop_percentage):
""" break up iterator for parallelization """
start = int(img.number_of_regions() * start_percentage)
stop = int(img.number_of_regions() * stop_percentage)
print(f'Processing chips {start} - {stop}')
return islice(img, 20000, 40000)
def do_diagnosis(
self,
image,
db,
request_id,
process_id,
start_percentage,
stop_percentage
):
""" converts filepath to region stream, then adds diagnosis to status """
region_stream = self.make_region_stream(image, start_percentage, stop_percentage)
diagnosis = self.model.diagnose(
region_stream,
total_chips=image.number_of_regions(),
db=db,
request_id=request_id,
process_id=process_id,
save_moderate = False,
save_severe = True,
save_bucket = 'digpath-predictions',
save_prefix = f'{request_id}/save_severe',
parallel=PARALLEL
)
return diagnosis
model_manager_for_web_app.config.DEFAULT_MODELS_DIR = '/home/ec2-user/models/wrapped_models'
MODEL_NAME = 'zrahn_efficientnet_0219_full_parallel'
DIAGNOSE_RUNNER = DiagnosisRunner(MODEL_NAME)
PARALLEL = True
def process_request(message_data):
try:
image = Image(message_data['file'])
diagnosis_results = DIAGNOSE_RUNNER.do_diagnosis(
image,
DIGPATH_DB,
message_data['request_id'],
message_data['process_id'],
message_data['start_percentage'],
message_data['stop_percentage'],
)
print(f"Request: {message_data['request_id']}, Process: {message_data['process_id']} complete")
request_info = DIGPATH_DB.get_request_info(message_data['request_id'])
print(json.dumps(request_info, indent=2))
if request_info['status'] == 'complete':
print(f"Deleting {image.filepath}")
os.remove(image.filepath)
# results_dir = '/home/ec2-user/data/results'
# with open(f"{results_dir}/{MODEL_NAME}_{message_data['file']}_diagnosis.json", 'w') as f:
# json.dump(diagnosis_results, f, indent=4)
return diagnosis_results
except:
traceback.print_exc()
DIGPATH_DB.update_request(
message_data['request_id'],
None,
None,
0,
0,
0,
"error"
)
os.remove(message_data['file'])
return {}
if __name__ == "__main__":
print(f"Diagnosis listener running. Listening for messages in the '{QUEUE_NAME}' queue")
if PARALLEL:
ray.init()
while True:
try:
messages = QUEUE.receive_messages()
for message in messages:
message_body = message.body
message.delete()
print(f"processing message: {message_body}")
message_data = json.loads(message_body)
results = process_request(message_data)
except Exception as e:
print(e)
# Asynchronous Diagnosis Service
# Digital Pathology Services & Application
This service demonstrates asynchronous processing of a diagnosis service. When the endpoint is hit, a record will be added to a database to store the progress of a diagnosis request, a message will be sent to an SQS queue for a subscriber to run the diagnosis processing, and the user can get information about the progress throughout processing.
The Digital Pathology application and services demonstrate asynchronous processing with a front end GUI. When the `/diagnose` endpoint is hit, a record will be added to a database to store the progress of a diagnosis request, a message will be sent to an SQS queue for a subscriber to run the diagnosis processing, and the user can get information about the progress throughout processing.
## Dependency Installation
```
pip install pysqlite3 boto3 flask
```
## Setup
A SQS queue is also required to send messages and trigger the *prediction listener*. This service is configured to send messages to the `digpath-request` queue.
A SQL database is also required to run the service. Run the following command to create a SQLite database with a *request* table:
```
cd setup/
python setup.py
```
The *request* table will keep track of the requests that are submitted, which file was requested for the processing/diagnosis, and the progress of that processing including when the processing is completed.
## Running the Service
## Running the Reqeuest Service
Run the following command to start the flask service. This will expose two endpoints */diagnose* and */request_info*.
```
cd svc/
cd services/request_service
python run.py
```
In the same folder, run the following command to start the SQS subscriber that will run the diagnosis processing triggered when the SQS queue recieves a message.
Another service must be running to listen for SQS messages and make ML predictions/diagnosis. Run the following command to start the SQS subscriber that will run the diagnosis processing triggered when the SQS queue recieves a message.
```
python prediction_listener.py
cd services/diagnosis_listener
python diagnosis_listener.py
```
## Testing the Service
......@@ -48,5 +31,7 @@ Again, hitting the */diagnose* endpoint will add a new row to the *request* DB t
You should see system logs from both the flask service and the prediction listener as the service processes the request. To check the progress of the service, run the following command:
```
curl localhost:5000/request_info -X GET -H 'Content-Type: application/json' -d '{"request_id": "REQUEST_ID"}'
curl localhost:5000/request_info/REQUEST_ID -X GET
```
Alternatively, you could run test_e2e.py in the `../ml` directory to test the service.
import os
DEFAULT_MODELS_DIR = os.path.abspath(
os.path.join(os.path.join(os.path.dirname(__file__),".."), "models")
)
S3_MODEL_BUCKET = "digpath-models"
S3_MODEL_FILE = "training_runs/dc5b3a4954de4c1f82e0939ac83e8ac3/checkpoints/model_2023-02-20T010424_37.pth"
MODEL_NAME = 'zrahn_efficientnet_0219_full_parallel'
PARALLEL_PROCESSING = True
REQUET_SERVICE_PORT = 5000
REQUEST_SQS_NAME = "digpath-request"
REQUEST_SQS_URL = f"https://sqs.us-east-1.amazonaws.com/432722299252/{REQUEST_SQS_NAME}"
#MySQL server vs SQLite file database
SQL_SERVER = False
if SQL_SERVER:
DB_HOST = "localhost"
DB_USER = "digpath"
DB_PASS = "password"
DB_NAME = "digpath"
else:
DB_DIR = os.path.abspath(
os.path.join(os.path.join(os.path.dirname(__file__),".."), "db")
)
DB_FILE = os.path.abspath(os.path.join(DB_DIR, "digpath.db"))
IMG_SLICE_BREAKDOWN = [1]
DEBUG = False
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment