Run Class methods in threads (python)
If you call them from the class, it is as simple as:
import threadingclass DomainOperations: def __init__(self): self.domain_ip = '' self.website_thumbnail = '' def resolve_domain(self): self.domain_ip = 'foo' def generate_website_thumbnail(self): self.website_thumbnail= 'bar' def run(self): t1 = threading.Thread(target=self.resolve_domain) t2 = threading.Thread(target=self.generate_website_thumbnail) t1.start() t2.start() t1.join() t2.join() print(self.domain_ip, self.website_thumbnail)if __name__ == '__main__': d = DomainOperations() d.run()
You can inherit Thread class in DomainOperation, in this way code would be more clean and easily understandable. you have to override a run() method.
from threading import Threadclass DomainOperations(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.domain_ip = '' self.website_thumbnail = '' def resolve_domain(self): self.domain_ip = 'foo' def generate_website_thumbnail(self): self.website_thumbnail= 'bar' def run(self): #domain will be resolved on first thread self.resolve_domain() #thumbnail will be resolved on second OR newly created below thread thread2 = Thread(target=self.generate_website_thumbnail) thread.start() # thread1 will wait for thread2 self.join() # thread2 will wait for thread1, if it's late. thread2.join() # here it will print ip and thumbnail before exiting first thread print(self.domain_ip, self.website_thumbnail)
And you will start your threads in this way.
if __name__ == '__main__': thread1 = DomainOperations() thread1.start()
def post_test(tbid, line_num, response_time): """ :param tbid: 参数id :return: """ # 请求参数 data = {'tbId': tbid, 'conditions': [{"key": "", "type": 1}], 'pageNum': 1, 'pageSize': 12} # 请求启动时间 start = time.time() # post请求 r = requests.post(url=url, data=json.dumps(data), headers=headers) # 请求结束时间 end = time.time() # 保留两位小数 finall_time = float('%.2f' % float(end - start)) text = json.loads(r.text) # IO写入 只写入200的 with open('text6.csv', 'a', newline='') as csvfile: if text['statusCode'] == '200': throughput = line_num * response_time / finall_time throughput = float('%.2f' % float(throughput)) print('the perf_counter time of %s is %s and the content is %s ,throughput is %s' % ( tbid, finall_time, json.loads(r.text), throughput)) spamwriter = csv.writer(csvfile, dialect='excel') spamwriter.writerow([tbid] + [finall_time] + [throughput])def start_thread(csv_name): tbid, response_time_sort, throughput_sort = read_csv(csv_name) print(tbid) line_num = len(tbid) response_times = 5 for j in range(response_times): for i in tbid: t = threading.Thread(target=post_test, args=(i, line_num, response_times)) t.start() t.join()
I don't know how to call a method in a class, especially if it has initialization parameters, but you can try this method。 I'm trying to use multiple processes to solve this problem, right。