Appearance
question:**Objective:** Assess the students' ability to handle and manipulate data using pandas, apply various data transformation, and aggregation techniques. # Problem Statement: You have been given a dataset containing information about different products sold by a company. The dataset includes the following columns: - `product_id`: A unique identifier for each product. - `category`: Category of the product (e.g., electronics, furniture, etc.). - `price`: The price of the product. - `quantity_sold`: The quantity of the product sold. - `date_of_sale`: The date when the product was sold. You need to perform various data manipulations and analysis using pandas. Specifically, you need to: 1. **Read the Dataset:** - Read the dataset from a CSV file named `sales_data.csv`. 2. **Basic Data Cleaning:** - Handle any missing values in the `price` and `quantity_sold` columns by replacing them with the mean value of the respective columns. 3. **Data Transformation and Analysis:** - Create a new column `total_revenue` which is calculated as the product of `price` and `quantity_sold`. - Group the data by `category` and compute the total revenue and average price for each category. - Filter the data to include only those products that were sold after January 1, 2021. 4. **Pivot Table and Reshape:** - Create a pivot table that shows the total revenue for each `category` per month. - Reshape the pivot table to stack the months into a single column. 5. **Visualization:** - Plot a bar chart showing the total revenue for each category. # Function Signature ```python import pandas as pd def analyze_sales_data(file_path: str) -> None: # Function to read the dataset, perform data cleaning, transformation, analysis, # create pivot table, reshape, and plot the results. pass ``` # Input: - `file_path` (str): The path to the `sales_data.csv` file. # Expected Output: - The function should perform the above operations and plot a bar chart displaying the total revenue for each product category. # Constraints: - You are required to use pandas for all data manipulations. - Ensure that all necessary imports are included in your solution. # Example Usage: ```python # Assuming the file sales_data.csv exists in the current directory analyze_sales_data("sales_data.csv") ``` # Notes: - For the bar chart, you can use pandas plotting capabilities or use matplotlib. - Ensure that your solution handles edge cases, such as missing values or dates in different formats. # Dataset Example (`sales_data.csv`): ``` product_id,category,price,quantity_sold,date_of_sale 1,electronics,500,4,2021-02-15 2,furniture,1500,2,2021-03-20 3,clothing,100,10,2020-12-25 4,electronics,,5,2021-01-05 5,furniture,1250,3,2021-02-10 ```
answer:import pandas as pd import matplotlib.pyplot as plt def analyze_sales_data(file_path: str) -> None: # Read the dataset from the CSV file df = pd.read_csv(file_path) # Basic Data Cleaning - Handle missing values in 'price' and 'quantity_sold' df['price'].fillna(df['price'].mean(), inplace=True) df['quantity_sold'].fillna(df['quantity_sold'].mean(), inplace=True) # Data Transformation - Add 'total_revenue' column df['total_revenue'] = df['price'] * df['quantity_sold'] # Group by 'category' and compute total revenue and average price category_group = df.groupby('category').agg(total_revenue=('total_revenue', 'sum'), average_price=('price', 'mean')).reset_index() # Filter data to include only sales after January 1, 2021 df['date_of_sale'] = pd.to_datetime(df['date_of_sale']) filtered_df = df[df['date_of_sale'] > '2021-01-01'] # Pivot table showing total revenue for each 'category' per month filtered_df['month'] = filtered_df['date_of_sale'].dt.to_period('M') pivot_table = pd.pivot_table(filtered_df, values='total_revenue', index='month', columns='category', aggfunc='sum', fill_value=0) pivot_table = pivot_table.reset_index() # Reshape pivot table to stack months into a single column reshaped_pivot = pivot_table.melt(id_vars=['month'], value_vars=pivot_table.columns[1:], var_name='category', value_name='total_revenue') # Visualization - Plot a bar chart showing total revenue for each category category_total_revenue = df.groupby('category')['total_revenue'].sum() category_total_revenue.plot(kind='bar') plt.title('Total Revenue for Each Category') plt.xlabel('Category') plt.ylabel('Total Revenue') plt.show()
question:**Problem Statement:** Write a Python program that sets up custom signal handlers for different types of signals. Your program should demonstrate the ability to: 1. Set and handle the `SIGINT` signal (typically sent using Ctrl+C). 2. Set and handle the `SIGALRM` signal using a timer. 3. Set and handle the `SIGUSR1` signal. 4. Utilize `signal.pause` to wait for a signal to be received. **Function Signatures:** ```python def handle_sigint(signum, frame): pass def handle_sigalrm(signum, frame): pass def handle_sigusr1(signum, frame): pass def setup_signal_handlers(): pass def main(): setup_signal_handlers() # Set an alarm for 5 seconds signal.alarm(5) # Wait for a signal to be received signal.pause() ``` **Detailed Requirements:** 1. Implement the `handle_sigint` function to print "SIGINT received, exiting." and exit the program using `sys.exit(0)`. 2. Implement the `handle_sigalrm` function to print "SIGALRM received, alarm triggered." 3. Implement the `handle_sigusr1` function to print "SIGUSR1 received." 4. Implement the `setup_signal_handlers` function to register the above handlers for their respective signals using `signal.signal`. 5. In the `main` function, call `setup_signal_handlers`, set an alarm to trigger in 5 seconds, and use `signal.pause` to wait for signals. **Constraints:** - The implemented signal handlers should properly print the messages described above. - The `SIGINT` handler should exit the program gracefully. - The code should be executable in a Unix environment where `SIGUSR1` and `SIGALRM` signals are available. **Example Execution:** When the program is run: 1. If the user sends a `SIGINT` using Ctrl+C, it should print "SIGINT received, exiting." and terminate the program. 2. If the `SIGALRM` fires after 5 seconds, it should print "SIGALRM received, alarm triggered." 3. If a `SIGUSR1` is sent to the process, it should print "SIGUSR1 received." **Testing Guide:** - Run the program and use `Ctrl+C` to ensure the `SIGINT` handler is triggered. - Run the program and wait for 5 seconds to ensure the `SIGALRM` handler is triggered. - Send a `SIGUSR1` signal to the process using the `kill` command from another terminal: ```bash kill -USR1 <pid> ``` Replace `<pid>` with the program's process ID. Ensure the `SIGUSR1` handler is triggered and prints the appropriate message. ```python import signal import sys def handle_sigint(signum, frame): print("SIGINT received, exiting.") sys.exit(0) def handle_sigalrm(signum, frame): print("SIGALRM received, alarm triggered.") def handle_sigusr1(signum, frame): print("SIGUSR1 received.") def setup_signal_handlers(): signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGALRM, handle_sigalrm) signal.signal(signal.SIGUSR1, handle_sigusr1) def main(): setup_signal_handlers() # Set an alarm for 5 seconds signal.alarm(5) # Wait for a signal to be received signal.pause() if __name__ == '__main__': main() ```
answer:import signal import sys def handle_sigint(signum, frame): print("SIGINT received, exiting.") sys.exit(0) def handle_sigalrm(signum, frame): print("SIGALRM received, alarm triggered.") def handle_sigusr1(signum, frame): print("SIGUSR1 received.") def setup_signal_handlers(): signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGALRM, handle_sigalrm) signal.signal(signal.SIGUSR1, handle_sigusr1) def main(): setup_signal_handlers() # Set an alarm for 5 seconds signal.alarm(5) # Wait for a signal to be received signal.pause() if __name__ == '__main__': main()
question:You are provided with a log file containing temperature readings taken every 10 minutes throughout the day. Your task is to find the top N highest temperatures over the course of the day. Each temperature reading contains a timestamp and the temperature value. Implement a function `find_top_n_temperatures(log_file: str, n: int) -> List[Tuple[str, float]]` that reads the log file, processes the temperature data, and returns a list of the top N highest temperature readings along with their timestamps. # Input: - `log_file`: A string representing the filename of the log file. Each line in the log file contains a timestamp and a temperature reading separated by a comma. - `n`: An integer representing the number of highest temperatures to find. # Output: - A list of tuples where each tuple contains a timestamp (string) and a temperature (float). The list should be sorted in descending order by temperature. # Constraints: - The log file is guaranteed to have at least `n` temperature readings. - The format of each line in the log file is 'timestamp,temperature'. For example: ``` 2023-10-05 08:00,23.4 2023-10-05 08:10,24.1 2023-10-05 08:20,22.8 ``` # Requirements: - The function should efficiently handle large log files using the heapq module. - The implementation should prioritize performance and operate in O(N log n) time complexity, where N is the total number of temperature readings in the log file. # Example: Given the following log file contents: ``` 2023-10-05 08:00,23.4 2023-10-05 08:10,24.1 2023-10-05 08:20,22.8 2023-10-05 08:30,25.6 2023-10-05 08:40,24.9 2023-10-05 08:50,26.3 2023-10-05 09:00,23.5 ``` And `n = 3`, calling `find_top_n_temperatures("temperature_log.txt", 3)` should return: ``` [ ("2023-10-05 08:50", 26.3), ("2023-10-05 08:30", 25.6), ("2023-10-05 08:40", 24.9) ] ``` Make sure your code handles reading from the log file and correctly uses the `heapq` module to find the top N highest temperatures.
answer:import heapq from typing import List, Tuple def find_top_n_temperatures(log_file: str, n: int) -> List[Tuple[str, float]]: Finds the top N highest temperatures from the log file. Args: log_file (str): The name of the log file containing temperature readings. n (int): The number of top highest temperatures to return. Returns: List[Tuple[str, float]]: A list of tuples where each tuple contains a timestamp and a temperature reading. The list is sorted in descending order by temperature. with open(log_file, 'r') as file: lines = file.readlines() min_heap = [] for line in lines: timestamp, temp_str = line.strip().split(',') temperature = float(temp_str) if len(min_heap) < n: heapq.heappush(min_heap, (temperature, timestamp)) else: heapq.heappushpop(min_heap, (temperature, timestamp)) largest_readings = heapq.nlargest(n, min_heap) result = [(ts, temp) for temp, ts in largest_readings] return result
question:**Custom SMTP Server Implementation** You are tasked with implementing a custom SMTP server using the deprecated `smtpd` module in Python. Your server should process incoming email messages and perform a specific action based on the content of the message. # Requirements: 1. **Custom SMTP Server Class**: - Create a class `CustomSMTPServer` inheriting from `smtpd.SMTPServer`. - Override the `process_message` method to read the content of the email message. 2. **Process Email Messages**: - If the email message contains the word "hello" (case-insensitive) in its body, log the message to a file named `greetings.log`. - If the message size exceeds a given limit (e.g., 1024 bytes), return an error response to the client. 3. **Server Initialization**: - Instantiate the server with local address `('localhost', 1025)` and remote address `None`. # Implementation: - Define the `CustomSMTPServer` class as described. - Implement the `process_message` method with the specified conditions. - Ensure the server runs and is capable of handling incoming SMTP connections. # Input: The `process_message` method will receive the following parameters: - `peer` (tuple): The remote host's address. - `mailfrom` (str): The envelope originator. - `rcpttos` (list): A list of envelope recipients. - `data` (str or bytes): The contents of the email message. - `**kwargs`: Additional keyword arguments. # Output: The `process_message` method should: - Return "250 Ok" if the message is successfully processed. - Return an appropriate error message in RFC 5321 format if the message size exceeds the limit. # Example: ```python import smtpd import asyncore class CustomSMTPServer(smtpd.SMTPServer): def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): # Implement the message processing logic here. pass # Initialize and run the server server = CustomSMTPServer(('localhost', 1025), None) asyncore.loop() ``` **Note**: Ensure you handle the case when the message data is in bytes or string format depending on the `decode_data` parameter.
answer:import smtpd import asyncore import logging class CustomSMTPServer(smtpd.SMTPServer): MAX_MESSAGE_SIZE = 1024 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) logging.basicConfig(filename='greetings.log', level=logging.INFO) def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): # Check message size if len(data) > self.MAX_MESSAGE_SIZE: return '552 Message size exceeds maximum limit' # Convert data to string if it is in bytes if isinstance(data, bytes): data = data.decode('utf-8') # Check for the word 'hello' in the message content if 'hello' in data.lower(): logging.info(f"From: {mailfrom}, To: {rcpttos}, Data: {data}") return '250 Ok' # Initialize and run the server if __name__ == "__main__": server = CustomSMTPServer(('localhost', 1025), None) asyncore.loop()