Processing requests in asynchronous mode
Posted: 06.05.2011, 09:59
Since not found an example using queries asynchronously, the only solution was to import the missing functions PQgetCancel, PQfreeCancel, PQcancel and use them in a stream.
Patch was written for version 6.6.4, but 6.6.6 and 7.0 do not differ much.
Example of use:
I will be grateful any amendments and comments!
Patch was written for version 6.6.4, but 6.6.6 and 7.0 do not differ much.
Code: Select all
ZPlainPostgreSql7.pas, ZPlainPostgreSql8.pas:
after
TPQconsumeInput
add
TPQgetCancel = function(Handle: PPGconn): PGcancel; cdecl;
TPQfreeCancel = procedure(Canc: PGcancel); cdecl;
TPQcancel = function(Canc: PGcancel; Buffer: PChar; BufSize: Integer): Integer; cdecl;
after
PQconsumeInput: TPQconsumeInput;
add
PQgetCancel: TPQgetCancel;
PQfreeCancel: TPQfreeCancel;
PQcancel: TPQcancel;
after
@PQconsumeInput := GetAddress('PQconsumeInput');
add
PQgetCancel := GetAddress('PQgetCancel');
PQfreeCancel := GetAddress('PQfreeCancel');
PQcancel := GetAddress('PQcancel');
ZPlainPostgreSqlDriver.pas:
after
PZPostgreSQLResult = Pointer;
add type PZPostgreSQLCancel:
PZPostgreSQLCancel = Pointer;
IZPostgreSQLPlainDriver, TZPostgreSQL7PlainDriver, TZPostgreSQL8PlainDriver
after
function ConsumeInput
add functions:
function GetCancel(Handle: PZPostgreSQLConnect): PZPostgreSQLCancel;
procedure FreeCancel( Canc: PZPostgreSQLCancel);
function Cancel( Canc: PZPostgreSQLCancel; Buffer: PChar;
Length: Integer): Integer;
function TZPostgreSQL7PlainDriver.GetCancel(
Handle: PZPostgreSQLConnect): PZPostgreSQLCancel;
begin
Result := ZPlainPostgreSql7.PQgetCancel(Handle);
end;
procedure TZPostgreSQL7PlainDriver.FreeCancel(Canc: PZPostgreSQLCancel);
begin
ZPlainPostgreSql7.PQfreeCancel( Canc);
end;
function TZPostgreSQL7PlainDriver.Cancel(Canc: PZPostgreSQLCancel;
Buffer: PChar; Length: Integer): Integer;
begin
Result := ZPlainPostgreSql7.PQcancel( Canc, Buffer, Length);
end;
function TZPostgreSQL8PlainDriver.GetCancel(
Handle: PZPostgreSQLConnect): PZPostgreSQLCancel;
begin
Result := ZPlainPostgreSql8.PQgetCancel(Handle);
end;
procedure TZPostgreSQL8PlainDriver.FreeCancel(Canc: PZPostgreSQLCancel);
begin
ZPlainPostgreSql8.PQfreeCancel( Canc);
end;
function TZPostgreSQL8PlainDriver.Cancel(Canc: PZPostgreSQLCancel;
Buffer: PChar; Length: Integer): Integer;
begin
Result := ZPlainPostgreSql8.PQcancel( Canc, Buffer, Length);
end;
Code: Select all
unit SQLWaitThread;
interface
uses Classes, ZConnection, ZPlainPostgreSqlDriver, ZDbcPostgreSql;
type
TSQLWaitThread = class( TThread)
private
FZConnection: TZConnection;
FDriver: IZPostgreSQLPlainDriver;
FHandle: PZPostgreSQLConnect;
FSQL: string;
FStateStr: string;
procedure SetStateStr( const aStateStr: string);
protected
procedure Execute; override;
procedure PGException( const aText: string);
public
constructor Create; override;
destructor Destroy; override;
procedure SetConnParams( aProtocol: string; aHostName: string; aPort: integer;
aDatabase: string; aUser: string; aPassword: string);
property SQL: string read FSQL write FSQL;
property StateStr: string read FStateStr write SetStateStr;
end;
implementation
uses SysUtils;
procedure SQLWaitThread_NoticeProcessor( aArg: Pointer; aMessage: PChar); cdecl;
var
lpSQLWaitThread: TSQLWaitThread;
begin
lpSQLWaitThread := aArg;
lpSQLWaitThread.StateStr := aMessage;
end;
constructor TSQLWaitThread.Create;
begin
inherited Create;
FZConnection := TZConnection.Create( nil);
end;
destructor TSQLWaitThread.Destroy;
begin
FreeAndNil( FZConnection);
inherited Destroy;
end;
procedure TSQLWaitThread.SetConnParams(aProtocol, aHostName: string;
aPort: integer; aDatabase, aUser, aPassword: string);
begin
with FZConnection do begin
Protocol := aProtocol;
HostName := aHostName;
Port := aPort;
Database := aDatabase;
User := aUser;
Password := aPassword;
Connect;
with DbcConnection as IZPostgreSQLConnection do begin
FDriver := GetPlainDriver;
FHandle := GetConnectionHandle;
FDriver.SetNoticeProcessor( FHandle, SQLWaitThread_NoticeProcessor, Self);
end;
end;
end;
procedure TSQLWaitThread.SetStateStr( const aStateStr: string);
begin
if FStateStr = aStateStr
then Exit;
FStateStr := aStateStr;
// DoEvent( WTN_STATE_TEXT_CHANGED, MessageData_CreateString( FStateStr));
end;
procedure TSQLWaitThread.PGException( const aText: string);
begin
raise Exception.CreateFmt( 'Postgre error in "%s":%s%s',
[ aText, #13#10, FDriver.GetErrorMessage( FHandle)]);
end;
procedure TSQLWaitThread.Execute;
var
lpCancel: PZPostgreSQLCancel;
lpBuff: string;
lpLen: integer;
lpResult: PZPostgreSQLResult;
lpRowCount: integer;
lpRowIndex: integer;
lpFieldCount: integer;
lpFieldIndex: integer;
begin
inherited Exec;
// send query
if FDriver.SendQuery( FHandle, PChar( SQL)) <> 1
then PGException( 'SendQuery');
while not Terminated do begin
if FDriver.ConsumeInput( FHandle) <> 1
then PGException( 'ConsumeInput');
if FDriver.IsBusy( FHandle) <> 1 then begin
Break;
end;
Sleep( 10);
end;
if Terminated then begin // user press "Cancel" buttton
lpCancel := FDriver.GetCancel( FHandle); // GetMem for lpCancel record
try
lpLen := 256;
SetLength( lpBuff, lpLen);
if FDriver.Cancel( lpCancel, PChar( lpBuff), lpLen) <> 1
then PGException( 'Cancel');
SetLength( lpBuff, lpLen); // ??
finally
FDriver.FreeCancel( lpCancel); // FreeMem for lpCancel
end;
end;
FQueryResult := '';
while True // results may be more than one ( if SQL='select 1; select 2')
lpResult := FDriver.GetResult( FHandle);
if lpResult = nil
then Break;
if FQueryResult <> ''
then FQueryResult := FQueryResult + #13#10;
lpFieldCount := FDriver.GetFieldCount( lpResult);
lpRowCount := FDriver.GetRowCount( lpResult);
for lpRowIndex := 0 to lpRowCount - 1 do begin
FQueryResult := FDriver.GetValue( lpResult, lpRowIndex, 0);
for lpFieldIndex := 1 to lpFieldCount - 1 do begin
FQueryResult := FQueryResult + #9 + FDriver.GetValue( lpResult, lpRowIndex, lpFieldIndex);
end;
FQueryResult := FQueryResult + #13#10;
end;
FDriver.Clear( lpResult);
end; // while
end;
I will be grateful any amendments and comments!